HEX
Server: LiteSpeed
System: Linux kapuas.iixcp.rumahweb.net 5.14.0-427.42.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 1 14:58:02 EDT 2024 x86_64
User: mirz4654 (1666)
PHP: 8.1.33
Disabled: system,exec,escapeshellarg,escapeshellcmd,passthru,proc_close,proc_get_status,proc_nice,proc_open,proc_terminate,shell_exec,popen,pclose,dl,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setsid,posix_setuid,posix_setpgid,ini_alter,show_source,define_syslog_variables,symlink,syslog,openlog,openlog,closelog,ocinumcols,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dll,ftp,myshellexec,socket_bind,mail,posix_getwpuid
Upload Files
File: //usr/local/lib/python3.9/site-packages/kombu/transport/__pycache__/consul.cpython-39.pyc
a

X>h�$�@s�dZddlmZddlZddlZddlmZddlmZddl	m
Z
ddlmZddl
mZdd	lmZdd
lmZmZddlmZdd
lmZzddlZWney�dZYn0ed�ZdZdZGdd�de�ZGdd�dej�ZGdd�dej �Z dS)a�Consul Transport module for Kombu.

Features
========

It uses Consul.io's Key/Value store to transport messages in Queues

It uses python-consul for talking to Consul's HTTP API

Features
========
* Type: Native
* Supports Direct: Yes
* Supports Topic: *Unreviewed*
* Supports Fanout: *Unreviewed*
* Supports Priority: *Unreviewed*
* Supports TTL: *Unreviewed*

Connection String
=================

Connection string has the following format:

.. code-block::

    consul://CONSUL_ADDRESS[:PORT]

�)�annotationsN)�defaultdict)�contextmanager)�Empty)�	monotonic)�ChannelError)�
get_logger)�dumps�loads)�cached_property�)�virtualzkombu.transport.consuli4!�	localhostc@seZdZdZdS)�	LockErrorz3An error occurred while trying to acquire the lock.N)�__name__�
__module__�__qualname__�__doc__�rr�@/usr/local/lib/python3.9/site-packages/kombu/transport/consul.pyr9srcs�eZdZdZdZdZdZdZ�fdd�Zdd	�Z	d
d�Z
dd
�Zdd�Zdd�Z
eefdd��Zefdd�Zdd�Zdd�Zdd�Zdd�Zdd�Zd(d d!�Zd"d#�Zd$d%�Zed&d'��Z�ZS))�Channelz?Consul Channel class which talks to the Consul Key/Value store.ZkombuNZ10s�csvtdurtd��t�j|i|��|jjjp2|jj}|jjjp@t	}t
�d|||j�t
t�|_tj|t|�d�|_dS)N�Missing python-consul libraryzHost: %s Port: %s Timeout: %s��host�port)�consul�ImportError�super�__init__�
connection�clientr�default_port�hostname�DEFAULT_HOST�logger�debug�timeoutr�dict�queues�Consul�int)�self�args�kwargsrr��	__class__rrrEs
zChannel.__init__cCs|j�d|�d�S)N�/z.lock��prefix�r,�queuerrr�	_lock_keyTszChannel._lock_keycCs|j�d|��S)Nr1r2r4rrr�_key_prefixWszChannel._key_prefixcCsDz|j|d}Wnty(d}Yn0|dur<|�|�S|��S)a�Get or create consul session.

        Try to renew the session if it exists, otherwise create a new
        session in Consul.

        This session is used to acquire a lock inside Consul so that we achieve
        read-consistency between the nodes.

        Arguments:
        ---------
            queue (str): The name of the Queue.

        Returns
        -------
            str: The ID of the session.
        �
session_idN)r)�KeyError�_renew_existing_session�_create_new_session)r,r5r8rrr�_get_or_create_sessionZs
��zChannel._get_or_create_sessioncCs&t�d|�|jjj|d�}|�d�S)Nz#Trying to renew existing session %s)r8ZID)r%r&r!�sessionZrenew�get)r,r8r=rrrr:rszChannel._renew_existing_sessioncCs<t�d|j|j�|jjj|j|jd�}t�d|j|�|S)NzCreating session %s with TTL %s)�nameZttlzCreated session %s with id %s)r%r&�	lock_name�session_ttlr!r=�create)r,r8rrrr;ws���zChannel._create_new_sessionc	cs4|j||d�zdVW|�|�n|�|�0dS)aITry to acquire a lock on the Queue.

        It does so by creating a object called 'lock' which is locked by the
        current session..

        This way other nodes are not able to write to the lock object which
        means that they have to wait before the lock is released.

        Arguments:
        ---------
            queue (str): The name of the Queue.
            raising (Exception): Set custom lock error class.

        Raises
        ------
            LockError: if the lock cannot be acquired.

        Returns
        -------
            bool: success?
        ��raisingN)�
_acquire_lock�
_release_lock)r,r5rDrrr�_queue_lock�szChannel._queue_lockcCs`|�|�}|�|�}t�d||�|jjj|||jd�rJ||j|d<dSt�	d|�|��dS)Nz/Trying to create lock object %s with session %s)�key�acquire�valuer8z Could not acquire lock on key %s)
r<r6r%r&r!�kv�putr@r)�info)r,r5rDr8Zlock_keyrrrrE�s

�
�zChannel._acquire_lockcCs,t�d|�|��|jjj|�|�d�dS)z�Try to release a lock.

        It does so by simply removing the lock key in Consul.

        Arguments:
        ---------
            queue (str): The name of the queue we want to release
                the lock from.
        zRemoving lock key %s)rHN)r%r&r6r!rK�deleter4rrrrF�s
zChannel._release_lockcCs2t�d|j|d�|jj�|j|d�dS)z�Destroy a previously created Consul session.

        Will release all locks it still might hold.

        Arguments:
        ---------
            queue (str): The name of the Queue.
        zDestroying session %sr8N)r%r&r)r!r=�destroyr4rrr�_destroy_session�s	zChannel._destroy_sessioncKs&ddi|j|<|jjj|�|�dd�S)Nr8)rHrJ)r)r!rKrLr7)r,r5�_rrr�
_new_queue�szChannel._new_queuecOs&|�|�|j�|d�|�|�dS�N)rPr)�pop�_purge)r,r5r-rQrrr�_delete�s
zChannel._deletecKsRd�|�|�ttt�d��t���}|jjj	|t
|�dd�sNtd|�d���dS)zaPut `message` onto `queue`.

        This simply writes a key to the K/V store of Consul
        z{}/msg/{}_{}i�r)rHrJ�caszCannot add key z
 to consulN)�formatr7r+�roundr�uuid�uuid4r!rKrLr	r)r,r5�payloadrQrHrrr�_put�s�zChannel._putc	Cs�|j|td���|�|��d�}t�d||j�|jjj|d|j|j	d�\|_}zn|dur`t��t�d|dd	|dd
�|jjj
|dd	|dd
d�t|dd�WWd�Sty�Yn0Wd�n1s�0Yt��dS)
z�Get the first available message from the queue.

        Before it does so it acquires a lock on the Key/Value store so
        only one node reads at the same time. This is for read consistency
        rC�/msg/zFetching key %s with index %sT�rH�recurse�index�waitNz#Removing key %s with modifyindex %sr�KeyZModifyIndex)rHrW�Value)
rGrr7r%r&rar!rKr>r'rNr
�	TypeError)r,r5r'rH�datarrr�_get�s(��
� $zChannel._getcCs(|�|�|jjj|�|��d�dd�S)Nr^T)rHr`)rPr!rKrNr7r4rrrrU�s

�zChannel._purgecCszd}zL|�|��d�}t�d||j�|jjj|d|j|jd�\|_}t|�}Wnt	ybYn0t�d|||j�|S)Nrr^z)Fetching key recursively %s with index %sTr_z$Found %s keys under %s with index %s)
r7r%r&rar!rKr>r'�lenre)r,r5�sizerHrfrrr�_sizes"���z
Channel._sizecCs
t���SrS)�socket�gethostname�r,rrrr@szChannel.lock_name)N)rrrrr3rar'rArr6r7r<r:r;rrrGrErFrPrRrVr]rgrUrjrr@�
__classcell__rrr/rr=s0	

rcspeZdZdZeZeZdZdZe	rLe
jje	j
e	jj
fZe
jje	j
e	jj
fZ�fdd�Zdd�Zdd�Z�ZS)	�	Transportz'Consul K/V storage Transport for Kombu.rcs&tdurtd��t�j|i|��dS)Nr)rrrr)r,r-r.r/rrr-szTransport.__init__cCsb|jjp|j}|jjpt}t�d||�z"tj|t	|�d�}|j
��WdSty\Yn0dS)Nz!Verify Consul connection to %s:%srTF)
r!rr"r#r$r%r&rr*r+Zagentr,�
ValueError)r,r rrr!rrr�verify_connection3s
zTransport.verify_connectioncCstjSrS)r�__version__rmrrr�driver_versionBszTransport.driver_version)rrrrr�DEFAULT_PORTr"Zdriver_typeZdriver_namerr
roZconnection_errorsZConsulException�baseZchannel_errorsrrqrsrnrrr/rros"
��
��ro)!r�
__future__rrkrZ�collectionsr�
contextlibrr5r�timerZkombu.exceptionsrZ	kombu.logrZkombu.utils.jsonr	r
Zkombu.utils.objectsr�r
rrr%rtr$�	Exceptionrrrorrrr�<module>s.
[