File: //usr/local/lib/python3.9/site-packages/kombu/transport/__pycache__/consul.cpython-39.pyc
a
X>h�$ � @ s� d Z ddlmZ ddlZddlZddlmZ ddlmZ ddl m
Z
ddlmZ ddl
mZ dd lmZ dd
lmZmZ ddlmZ dd
lmZ zddlZW n ey� dZY n0 ed�ZdZdZG dd� de�ZG dd� dej�ZG dd� dej �Z dS )a� Consul Transport module for Kombu.
Features
========
It uses Consul.io's Key/Value store to transport messages in Queues
It uses python-consul for talking to Consul's HTTP API
Features
========
* Type: Native
* Supports Direct: Yes
* Supports Topic: *Unreviewed*
* Supports Fanout: *Unreviewed*
* Supports Priority: *Unreviewed*
* Supports TTL: *Unreviewed*
Connection String
=================
Connection string has the following format:
.. code-block::
consul://CONSUL_ADDRESS[:PORT]
� )�annotationsN)�defaultdict)�contextmanager)�Empty)� monotonic)�ChannelError)�
get_logger)�dumps�loads)�cached_property� )�virtualzkombu.transport.consuli4! � localhostc @ s e Zd ZdZdS )� LockErrorz3An error occurred while trying to acquire the lock.N)�__name__�
__module__�__qualname__�__doc__� r r �@/usr/local/lib/python3.9/site-packages/kombu/transport/consul.pyr 9 s r c s� e Zd ZdZdZdZdZdZ� fdd�Zdd � Z d
d� Z
dd
� Zdd� Zdd� Z
eefdd��Zefdd�Zdd� Zdd� Zdd� Zdd� Zdd� Zd(d d!�Zd"d#� Zd$d%� Zed&d'� �Z� ZS ))�Channelz?Consul Channel class which talks to the Consul Key/Value store.ZkombuNZ10s� c sv t d u rtd��t� j|i |�� | jjjp2| jj}| jjjp@t }t
�d||| j� t
t�| _t j|t|�d�| _d S )N�Missing python-consul libraryzHost: %s Port: %s Timeout: %s��host�port)�consul�ImportError�super�__init__�
connection�clientr �default_port�hostname�DEFAULT_HOST�logger�debug�timeoutr �dict�queues�Consul�int)�self�args�kwargsr r �� __class__r r r E s
zChannel.__init__c C s | j � d|� d�S )N�/z.lock��prefix�r, �queuer r r � _lock_keyT s zChannel._lock_keyc C s | j � d|� �S )Nr1 r2 r4 r r r �_key_prefixW s zChannel._key_prefixc C sD z| j | d }W n ty( d}Y n0 |dur<| �|�S | �� S )a� Get or create consul session.
Try to renew the session if it exists, otherwise create a new
session in Consul.
This session is used to acquire a lock inside Consul so that we achieve
read-consistency between the nodes.
Arguments:
---------
queue (str): The name of the Queue.
Returns
-------
str: The ID of the session.
�
session_idN)r) �KeyError�_renew_existing_session�_create_new_session)r, r5 r8 r r r �_get_or_create_sessionZ s
��zChannel._get_or_create_sessionc C s&