HEX
Server: LiteSpeed
System: Linux kapuas.iixcp.rumahweb.net 5.14.0-427.42.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 1 14:58:02 EDT 2024 x86_64
User: mirz4654 (1666)
PHP: 8.1.33
Disabled: system,exec,escapeshellarg,escapeshellcmd,passthru,proc_close,proc_get_status,proc_nice,proc_open,proc_terminate,shell_exec,popen,pclose,dl,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setsid,posix_setuid,posix_setpgid,ini_alter,show_source,define_syslog_variables,symlink,syslog,openlog,openlog,closelog,ocinumcols,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dll,ftp,myshellexec,socket_bind,mail,posix_getwpuid
Upload Files
File: //usr/local/lib/python3.9/site-packages/kombu/transport/__pycache__/azureservicebus.cpython-39.pyc
a

X>hzG�@s>dZddlmZddlZddlmZddlmZddlZ	ddl
Z	ddlZddlm
Z
mZmZmZmZddlmZzddlmZmZWney�dZdZYn0dd	lmZmZdd
lmZmZddlmZdd
l m!Z!e"ej#�hd�Z$e%d�e%d�idd�e$D��Z&Gdd�d�Z'Gdd�de!j(�Z(Gdd�de!j)�Z)dS)aXAzure Service Bus Message Queue transport module for kombu.

Note that the Shared Access Policy used to connect to Azure Service Bus
requires Manage, Send and Listen claims since the broker will create new
queues and delete old queues as required.


Notes when using with Celery if you are experiencing issues with programs not
terminating properly. The Azure Service Bus SDK uses the Azure uAMQP library
which in turn creates some threads. If the AzureServiceBus Channel is closed,
said threads will be closed properly, but it seems there are times when Celery
does not do this so these threads will be left running. As the uAMQP threads
are not marked as Daemon threads, they will not be killed when the main thread
exits. Setting the ``uamqp_keep_alive_interval`` transport option to 0 will
prevent the keep_alive thread from starting


More information about Azure Service Bus:
https://azure.microsoft.com/en-us/services/service-bus/

Features
========
* Type: Virtual
* Supports Direct: *Unreviewed*
* Supports Topic: *Unreviewed*
* Supports Fanout: *Unreviewed*
* Supports Priority: *Unreviewed*
* Supports TTL: *Unreviewed*

Connection String
=================

Connection string has the following formats:

.. code-block::

    azureservicebus://SAS_POLICY_NAME:SAS_KEY@SERVICE_BUSNAMESPACE
    azureservicebus://DefaultAzureCredential@SERVICE_BUSNAMESPACE
    azureservicebus://ManagedIdentityCredential@SERVICE_BUSNAMESPACE

Transport Options
=================

* ``queue_name_prefix`` - String prefix to prepend to queue names in a
  service bus namespace.
* ``wait_time_seconds`` - Number of seconds to wait to receive messages.
  Default ``5``
* ``peek_lock_seconds`` - Number of seconds the message is visible for before
  it is requeued and sent to another consumer. Default ``60``
* ``uamqp_keep_alive_interval`` - Interval in seconds the Azure uAMQP library
  should send keepalive messages. Default ``30``
* ``retry_total`` - Azure SDK retry total. Default ``3``
* ``retry_backoff_factor`` - Azure SDK exponential backoff factor.
  Default ``0.8``
* ``retry_backoff_max`` - Azure SDK retry total time. Default ``120``
�)�annotationsN)�Empty)�Any)�ServiceBusClient�ServiceBusMessage�ServiceBusReceiveMode�ServiceBusReceiver�ServiceBusSender)�ServiceBusAdministrationClient)�DefaultAzureCredential�ManagedIdentityCredential)�bytes_to_str�safe_str)�dumps�loads)�cached_property�)�virtual>�.�-�_rrcCsi|]}t|�td��qS)r)�ord)�.0�c�r�I/usr/local/lib/python3.9/site-packages/kombu/transport/azureservicebus.py�
<dictcomp>Y�rc@s0eZdZdZdddd�dd�Zdd	�d
d�ZdS)
�SendReceivez"Container for Sender and Receiver.N�ServiceBusReceiver | None�ServiceBusSender | None��receiver�sendercCs||_||_dS�Nr!)�selfr"r#rrr�__init__`szSendReceive.__init__�None��returncCs0|jr|j��d|_|jr,|j��d|_dSr$)r"�closer#�r%rrrr*fs

zSendReceive.close)NN)�__name__�
__module__�__qualname__�__doc__r&r*rrrrr]s
�rcs^eZdZUdZdZded<dZded<dZded<d	Zded
<dZ	ded
<dZ
ded<dZded<iZded<e
�Zded<�fdd�Zdd�dd�Z�fdd�Z�fdd �Zdfdd"d#d$d%�d&d'�Zdd$d(�d)d*�Zejd!fdd+d,d$d-�d.d/�Zdgdd0dd1�d2d3�Zd4dd5�d6d7�Zdd$d(�d8d9�Zddd(�d:d;�Zddd(�d<d=�Zdhdd>d?d@�dAdB�ZdiddDddE��fdFdG�
Zddd(�dHdI�Zdd�dJdK�Z dd�dLdM�Z!e"dNd�dOdP��Z#e"dQd�dRdS��Z$e%dTdU��Z&e%dVdW��Z'e"dd�dXdY��Z(e"dd�dZd[��Z)e"dd�d\d]��Z*e"dd�d^d_��Z+e"dd�d`da��Z,e"dd�dbdc��Z-e"dd�ddde��Z.�Z/S)j�ChannelzAzure Service Bus channel.��int�default_wait_time_seconds�<�default_peek_lock_seconds��!default_uamqp_keep_alive_interval��default_retry_totalg�������?�float�default_retry_backoff_factor�x�default_retry_backoff_maxzkombu%(vhost)s�str�
domain_formatzdict[str, SendReceive]�_queue_cachezset[str]�
_noack_queuescs>t�j|i|��d|_d|_d|_d|_|��d|j_dS)NF)	�superr&�
_namespace�_policy�_sas_key�_connection_string�_try_parse_connection_string�qosZrestore_at_shutdown)r%�args�kwargs��	__class__rrr&�szChannel.__init__r'r(cCs�t�|jj�\|_|_tdur*t|jt�s>tdurBt|jt�rBdSd|jvrb|j�	dd�\|_
|_d|j|j
|jd�}d�dd�|�
�D��|_dS)N�:rzsb://)ZEndpointZSharedAccessKeyNameZSharedAccessKey�;cSsg|]\}}|d|�qS)�=r)r�key�valuerrr�
<listcomp>�rz8Channel._try_parse_connection_string.<locals>.<listcomp>)�	Transport�	parse_uri�conninfo�hostnamerC�_credentialr�
isinstancer�splitrDrE�join�itemsrF)r%Z	conn_dictrrrrG�s*��
��
�
��z$Channel._try_parse_connection_stringcs,|r|j�|�t�j||g|�Ri|��Sr$)rA�addrB�
basic_consume)r%�queueZno_ackrIrJrKrrr]�s���zChannel.basic_consumecs,||jvr |j|}|j�|�t��|�Sr$)Z
_consumersZ
_tag_to_queuerA�discardrB�basic_cancel)r%Zconsumer_tagr^rKrrr`�s

zChannel.basic_cancelNrr r)�namer"r#r)cCsF||jvr.|j|}|jp||_|jp(||_nt||�}||j|<|Sr$)r@r#r"r)r%rar"r#�objrrr�_add_queue_to_cache�s



zChannel._add_queue_to_cache)r^r)cCsD|j�|d�}|dus |jdur@|jj||jd�}|j||d�}|S)N)�
keep_alive)r#)r@�getr#�
queue_serviceZget_queue_sender�uamqp_keep_alive_intervalrc)r%r^�	queue_objr#rrr�_get_asb_sender�s�zChannel._get_asb_senderrz
str | None)r^�	recv_mode�queue_cache_keyr)cCsN|p|}|j�|d�}|dus(|jdurJ|jj|||jd�}|j||d�}|S)N)�
queue_nameZreceive_moderd)r")r@rer"rfZget_queue_receiverrgrc)r%r^rjrk�	cache_keyrhr"rrr�_get_asb_receiver�s�zChannel._get_asb_receiverzdict[int, int] | None)ra�tabler)cCstt|���|pt�S)z:Format AMQP queue name into a valid ServiceBus queue name.)r>r�	translate�CHARS_REPLACE_TABLE)r%rarorrr�entity_name�szChannel.entity_namezvirtual.base.Message)�messager)cCsdSr$r)r%rsrrr�_restore�szChannel._restorecKs�|�|j|�}z|j|WStyzt�tj|jd��}z|jj	||d�Wnt
jjj
yjYn0|�|�YS0dS)z$Ensure a queue exists in ServiceBus.)�seconds)rl�
lock_durationN)rr�queue_name_prefixr@�KeyError�isodateZduration_isoformatZDuration�peek_lock_seconds�queue_mgmt_serviceZcreate_queue�azure�core�
exceptionsZResourceExistsErrorrc)r%r^rJrvrrr�
_new_queue�s��
zChannel._new_queuecOs:|�|j|�}|j�|�|j�|d�}|r6|��dS)zDelete queue by name.N)rrrwr{Zdelete_queuer@�popr*)r%r^rIrJZsend_receive_objrrr�_delete�s
zChannel._deletecKs6|�|j|�}tt|��}|�|�}|j�|�dS)zPut message onto queue.N)rrrwrrrir#Z
send_messages)r%r^rsrJ�msgrhrrr�_put�s
zChannel._putzfloat | int | Nonezdict[str, Any])r^�timeoutr)c	Cs�||jvrtjntj}|�|j|�}|�||�}|jjd|pB|j	d�}|sRt
��|d}t|jt
�std�|j�}n|j}tt|��}||ddd<||ddd<|S)	z/Try to retrieve a single message off ``queue``.r�Zmax_message_countZ
max_wait_timerrZ
properties�
delivery_info�
azure_message�azure_queue_name)rAr�RECEIVE_AND_DELETE�	PEEK_LOCKrrrwrnr"�receive_messages�wait_time_secondsrrX�body�bytesrZrr
)	r%r^r�rjrh�messagesrsr�r�rrr�_gets(���zChannel._getF�bool)�delivery_tag�multipler)cs�z|j�|�j}Wnty0t��|�Ynv0|d}|�|�}z|j�|d�Wn@t	j
jjy|t��|�Yn*t
y�t��|�Yn0t��|�dS)Nr�r�)rHrer�rxrB�	basic_ackrnr"Zcomplete_messager|Z
servicebusr~ZMessageAlreadySettled�	ExceptionZbasic_reject)r%r�r�r�r^rhrKrrr�#s
�zChannel.basic_ackcCs"|�|j|�}|j�|�}|jS)z)Return the number of messages in a queue.)rrrwr{Zget_queue_runtime_propertiesZtotal_message_count)r%r^�propsrrr�_size7sz
Channel._sizecCs�d}d}|�|j|�}|j�|d�}||jvsB|dusB|jdurV|�|tjd|�}|jj	|dd�}|t
|�7}t
|�|krVq�qV|S)z'Delete all current messages in a queue.r�
NZpurge_g�������?r�)rrrwr@rerAr"rnrr�r��len)r%r^�nZmax_purge_countrhr�rrr�_purge>s*
��
��zChannel._purgecCsH|jsDd|_|j��D]}|��q|j��|jdurD|j�|�dS)NT)�closedr@�valuesr*�clear�
connectionZ
close_channel)r%rhrrrr*Zs


z
Channel.closercCs<|jr tj|j|j|j|jd�St|j|j|j|j|jd�S)N)�retry_total�retry_backoff_factor�retry_backoff_max)rFr�from_connection_stringr�r�r�rCrWr+rrrrfes��zChannel.queue_servicer
cCs |jrt�|j�St|j|j�Sr$)rFr
r�rCrWr+rrrr{ws��zChannel.queue_mgmt_servicecCs|jjSr$)r��clientr+rrrrU�szChannel.conninfocCs
|jjjSr$)r�r��transport_optionsr+rrrr��szChannel.transport_optionscCs|j�dd�S)Nrw�)r�rer+rrrrw�szChannel.queue_name_prefixcCs|j�d|j�S)Nr�)r�rer3r+rrrr��s�zChannel.wait_time_secondscCst|j�d|j�d�S)Nrzi,)�minr�rer5r+rrrrz�s

��zChannel.peek_lock_secondscCs|j�d|j�S)Nrg)r�rer7r+rrrrg�s�z!Channel.uamqp_keep_alive_intervalcCs|j�d|j�S)Nr�)r�rer9r+rrrr��s�zChannel.retry_totalcCs|j�d|j�S)Nr�)r�rer;r+rrrr��s�zChannel.retry_backoff_factorcCs|j�d|j�S)Nr�)r�rer=r+rrrr��s�zChannel.retry_backoff_max)NN)N)N)F)0r,r-r.r/r3�__annotations__r5r7r9r;r=r?r@�setrAr&rGr]r`rcrirr�rnrrrtrr�r�r�r�r�r�r*rrfr{�propertyrUr�rwr�rzrgr�r�r��
__classcell__rrrKrr0osn
�
��	
� 


r0c@sJeZdZdZeZdZdZdZeddd�dd	��Z	e
dddd�dd
��ZdS)rSzAzure Service Bus transport.rNTr>zDtuple[str, str | DefaultAzureCredential | ManagedIdentityCredential])�urir)cCs�|�dd�}|�dd�\}}|�d�s.|d7}d��|��krVtdurNtd��t�}nFd	��|��kr~tdurvtd
��t�}n|�dd�\}}|�d|��}t||g�s�t	d��||fS)
Nzazureservicebus://r��@rz.netz.servicebus.windows.netrz]Azure Service Bus transport with a DefaultAzureCredential requires the azure-identity libraryrz`Azure Service Bus transport with a ManagedIdentityCredential requires the azure-identity libraryrMz|Need a URI like azureservicebus://{SAS policy name}:{SAS key}@{ServiceBus Namespace} or the azure Endpoint connection string)
�replace�rsplit�endswith�lowerr�ImportErrorrrY�all�
ValueError)r��
credential�	namespace�policy�sas_keyrrrrT�s&	
�zTransport.parse_uriF�**cCsV|�|�\}}t|t�rFd|vrF|�dd�\}}d�||r>|n||�Sd�|jj|�S)NrMrzazureservicebus://{}:{}@{}zazureservicebus://{}@{})rTrXr>rY�formatrLr,)�clsr�Zinclude_password�maskr�r�r�r�rrr�as_uri�s
��zTransport.as_uri)Fr�)r,r-r.r/r0Zpolling_interval�default_portZ
can_parse_url�staticmethodrT�classmethodr�rrrrrS�s*rS)*r/�
__future__r�stringr^r�typingrZazure.core.exceptionsr|Zazure.servicebus.exceptionsryZazure.servicebusrrrrr	Zazure.servicebus.managementr
Zazure.identityrrr�Zkombu.utils.encodingr
rZkombu.utils.jsonrrZkombu.utils.objectsrr�rr��punctuationZPUNCTUATIONS_TO_REPLACErrqrr0rSrrrr�<module>s89
��C