HEX
Server: LiteSpeed
System: Linux kapuas.iixcp.rumahweb.net 5.14.0-427.42.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 1 14:58:02 EDT 2024 x86_64
User: mirz4654 (1666)
PHP: 8.1.33
Disabled: system,exec,escapeshellarg,escapeshellcmd,passthru,proc_close,proc_get_status,proc_nice,proc_open,proc_terminate,shell_exec,popen,pclose,dl,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setsid,posix_setuid,posix_setpgid,ini_alter,show_source,define_syslog_variables,symlink,syslog,openlog,openlog,closelog,ocinumcols,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dll,ftp,myshellexec,socket_bind,mail,posix_getwpuid
Upload Files
File: //usr/local/lib/python3.9/site-packages/kombu/transport/__pycache__/mongodb.cpython-39.pyc
a

X>h�=�@s�dZddlmZddlZddlmZddlZddlmZmZm	Z	ddl
mZddlm
Z
ddlmZdd	lmZdd
lmZmZddlmZddlmZd
dlmZd
dlmZdZdZGdd�d�ZGdd�dej �Z Gdd�dej!�Z!dS)aMongoDB transport module for kombu.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: Yes

Connection String
=================
 *Unreviewed*

Transport Options
=================

* ``connect_timeout``,
* ``ssl``,
* ``ttl``,
* ``capped_queue_size``,
* ``default_hostname``,
* ``default_port``,
* ``default_database``,
* ``messages_collection``,
* ``routing_collection``,
* ``broadcast_collection``,
* ``queues_collection``,
* ``calc_queue_size``,
�)�annotationsN)�Empty)�MongoClient�errors�
uri_parser)�
CursorType)�VersionMismatch)�_detect_environment)�bytes_to_str)�dumps�loads)�cached_property)�maybe_sanitize_url�)�virtual��to_rabbitmq_queue_argumentsz3Kombu requires MongoDB version 1.3+ (server is {0})zKKombu requires MongoDB version 2.2+ (server is {0}) for TTL indexes supportc@sFeZdZdZdd�Zdd�Zdd�Zdd	d
�Zdd�Zd
d�Z	e	Z
dS)�BroadcastCursorzCursor for broadcast queues.cCs||_d|_|jdd�dS)NrF)�rewind)�_cursor�_offset�purge)�self�cursor�r�A/usr/local/lib/python3.9/site-packages/kombu/transport/mongodb.py�__init__CszBroadcastCursor.__init__cCs|jj�i�|jS�N)r�
collection�count_documentsr�rrrr�get_sizeHszBroadcastCursor.get_sizecCs|j��dSr)r�closer rrrr"KszBroadcastCursor.closeTcCs2|r|j��|jj�i�|_|j�|j�|_dSr)rrrrr�skip)rrrrrrNs
zBroadcastCursor.purgecCs|Srrr rrr�__iter__VszBroadcastCursor.__iter__c
Cspzt|j�}Wq^tjjyX}z.dt|�vrB|��WYd}~q�WYd}~qd}~00q^q|jd7_|S)Nznot valid at serverr)�nextr�pymongor�OperationFailure�strrr)r�msg�excrrr�__next__YszBroadcastCursor.__next__N)T)�__name__�
__module__�__qualname__�__doc__rr!r"rr$r+r%rrrrr@s
rcs\eZdZdZdZiZdZdZdZdZ	dZ
dZdZdZ
d	Zd
ZdZdZejjd
Z�fdd�Zdd�Zdd�Z�fdd�Zdd�Zdd�Zdd�Zdd�Zdd�Z�fd d!�ZdGd#d$�Zd%d&�Z d'd(�Z!dHd)d*�Z"d+d,�Z#d-d.�Z$d/d0�Z%e&d1d2��Z'e&d3d4��Z(e&d5d6��Z)e&d7d8��Z*e&d9d:��Z+d;d<�Z,d=d>�Z-d?d@�Z.dAdB�Z/dCdD�Z0dEdF�Z1�Z2S)I�ChannelzMongoDB Channel.TFNi��z	127.0.0.1i�iZ
kombu_default�messageszmessages.routingzmessages.broadcastzmessages.queues)�connect_timeout�ssl�ttl�capped_queue_size�default_hostname�default_port�default_database�messages_collection�routing_collection�broadcast_collection�queues_collection�calc_queue_sizecs"t�j|i|��i|_|jdSr)�superr�_broadcast_cursors�client)rZvargs�kwargs��	__class__rrr�szChannel.__init__c	Ks4|jr0|jjd|id|||�|d�d�idd�dS)N�_id�$set�	x-expires)rD�options�	expire_atT�Zupsert)r4�queues�
update_one�_get_queue_expire)r�queuerArrr�
_new_queue�s���	�zChannel._new_queuecCs�||jvr6zt|�|��}WqRty2d}YqR0n|jjd|idtjfgd�}|jrb|�	|�|durpt
��tt|d��S)NrM�priority)�sort�payload)
�_fanout_queuesr%�_get_broadcast_cursor�
StopIterationr1Zfind_one_and_deleter&Z	ASCENDINGr4�_update_queues_expirerrr
)rrMr)rrr�_get�s

�
zChannel._getcs:|jst��|�S||jvr*|�|���S|j�d|i�S�NrM)r=r>�_sizerRrSr!r1r�rrMrBrrrX�s

z
Channel._sizecKsrt|�||j|dd�d�}|jrb|�|d�|d<|�|�}|durb|ddusZ||dkrb||d<|j�|�dS)NT)�reverse)rQrMrOz
x-message-ttlrH)rZ_get_message_priorityr4rL�_get_message_expirer1�
insert_one)rrM�messagerA�dataZ
msg_expirerrr�_put�s�

�
�zChannel._putcKs|j�t|�|d��dS)N)rQrM)�	broadcastr\r)r�exchanger]�routing_keyrArrr�_put_fanout�s�zChannel._put_fanoutcCs8|�|�}||jvr$|�|���n|j�d|i�|SrW)rXrRrSrr1�delete_many)rrM�sizerrr�_purge�s


zChannel._purgecCs:t|jj|d�}|j�d|i�}|tdd�|D��BS)N�tableracss$|]}|d|d|dfVqdS)rb�patternrMNr)�.0�rrrr�	<genexpr>�s�z$Channel.get_table.<locals>.<genexpr>)�	frozenset�stateZ	exchanges�routing�find)rraZlocalRoutesZbrokerRoutesrrr�	get_table�s�
�zChannel.get_tablecCsp|�|�jdkr*|�||||�||j|<||||d�}|��}|jrV|�|d�|d<|jj|d|idd�dS)N�fanout)rarMrbrhrFrHrETrI)	Ztypeof�type�_create_broadcast_cursorrR�copyr4rLrnrK)rrarbrhrM�lookupr^rrr�_queue_bind�s�
�zChannel._queue_bindcs�|j�d|i�|jr&|j�d|i�t�j|fi|��||jvr|z|j�	|�}Wnt
yfYn0|��|j�	|�dS)NrMrD)rnrdr4rJZ
delete_oner>�queue_deleterRr?�pop�KeyErrorr")rrMrArrBrrrws
zChannel.queue_delete�
mongodb://cCs0|jj}|j}|�d�r$d}d|}|�|�s6||}|t|�d�sP||j7}|jr�d|vr�|�d�\}}|j}|jr�|d|j7}|d|d|}|j	r�|j	n|j
}tj||dd�}|d	p�|j
}	|	d
vr�|j}	d|j|jr�t|jd�ndd
�}
|
�|d�|�|
�}
d|
v�r&|
�d�||	|
fS)Nzsrv://zmongodb+srv://zmongodb+�@z://�:F)�validate�database)�/NTi�)�auto_start_requestr3ZconnectTimeoutMSrGZtlsr3)�
connectionr@�hostname�
startswith�lenr6Zuserid�split�password�portr7r�	parse_uriZvirtual_hostr8r3r2�int�update�_prepare_client_optionsrx)r�schemer@r��head�tail�credentialsr��parsed�dbnamerGrrr�
_parse_uris>


��


zChannel._parse_uricCsBtjdkr>|�dd�t|�d�t�r>tjj}||d|d<|S)N��r�Zreadpreference)r&�
version_tuplerx�
isinstance�getr�Zread_preferencesZ
_MONGOS_MODES)rrG�modesrrrr�Ks
zChannel._prepare_client_optionscKst|fi|��Srr)r�	argumentsrArrr�prepare_queue_argumentsSszChannel.prepare_queue_argumentscCs�|j|d�\}}}||d<t�}|dkr>ddlm}|��n|dkrXddlm}|�tfi|��}||}	|��d}
|
�	d	�d}
t
tt|
�	d
���}|dkr�t
t�|
���n|jr�|dkr�t
t�|
���|	S)
N)r��host�geventr)�monkey�eventlet)�monkey_patch�version�-�.)rr�)�r�)r�r	r�r�Z	patch_allr�r�rZserver_infor��tuple�mapr�r�E_SERVER_VERSION�formatr4�E_NO_TTL_INDEXES)rr�r�r��conf�envr�r�Z	mongoconnr~�version_strr�rrr�_openVs&
z
Channel._opencCs*|j|��vrdS|j|j|jdd�dS)z0Create capped collection for broadcast messages.NT)reZcapped)r;Zlist_collection_namesZcreate_collectionr5�rr~rrr�_create_broadcastqs�zChannel._create_broadcastcCs�||j}|jgd�dd�||j�dg�||j}|�ddg�|jr�|jdgdd�|jdgdd�||jjdgdd�d	S)
zEnsure indexes on collections.)�rMr)rOr)rDrT)Z
backgroundr�)rar)rHrr)ZexpireAfterSecondsN)r9Zcreate_indexr;r:r4r<)rr~r1rnrrr�_ensure_indexeszs
�

�zChannel._ensure_indexescCs |��}|�|�|�|�|S)zActually creates connection.)r�r�r�r�rrr�_create_client�s

zChannel._create_clientcCs|��Sr)r�r rrrr@�szChannel.clientcCs|j|jSr)r@r9r rrrr1�szChannel.messagescCs|j|jSr)r@r:r rrrrn�szChannel.routingcCs|j|jSr)r@r;r rrrr`�szChannel.broadcastcCs|j|jSr)r@r<r rrrrJ�szChannel.queuesc	Cs:z|j|WSty4|�|j|dd|�YS0dSr)r?ryrsrRrYrrrrS�s�zChannel._get_broadcast_cursorcCsRtjdkrd|itjd�}nd|idd�}|jjfi|��}t|�}|j|<|S)Nr�rM)�filterZcursor_typeT)�queryZtailable)r&r�rZTAILABLEr`rorr?)rrarbrhrMr�r�retrrrrs�s
��z Channel._create_broadcast_cursorcCs6|�di��d�}|dur2|��tjt|�d�SdS)NZ
propertiesZ
expiration�Zmilliseconds)r��get_now�datetime�	timedeltar�)rr]�valuerrrr[�szChannel._get_message_expirec	Csnt|t�r,|j�d|i�}|s"dS|d}n|}z|d|}WnttfyXYdS0|��tj|d�S)z�Get expiration header named `argument` of queue definition.

        Note:
        ----
            `queue` must be either queue name or options itself.
        rDNrGr�r�)	r�r(rJZfind_onery�	TypeErrorr�r�r�)rrM�argument�docr^r�rrrrL�s

zChannel._get_queue_expirecCsL|�|d�}|sdS|j�d|idd|ii�|j�d|idd|ii�dS)z,Update expiration field on queues documents.rFNrMrErHrD)rLrnZupdate_manyrJ)rrMrHrrrrU�s��zChannel._update_queues_expirecCs
tj��S)zReturn current time in UTC.)r��utcnowr rrrr��szChannel.get_now)rz)rz)3r,r-r.r/Zsupports_fanoutrRr3r4r2r5r=r6r7r8r9r:r;r<rr0Zfrom_transport_optionsrrNrVrXr_rcrfrprvrwr�r�r�r�r�r�r�r
r@r1rnr`rJrSrsr[rLrUr��
__classcell__rrrBrr0osb


2
	




r0c@s�eZdZdZeZdZdZejZej	j
ejfZ
ej	j
ejejfZ
dZdZej	jjegd��d�Zdd	�Zdddd
�dd�ZdS)�	TransportzMongoDB Transport.TrZmongodbr&)�directZtopicrq)Z
exchange_typecCstjSr)r&r�r rrr�driver_version	szTransport.driver_versionF�**r()�uri�returncCsB|sdS|r|Sd|vr t|�S|�dd�\}}d�t|�|g�S)Nrz�,r)rr��join)rr�Zinclude_password�maskZuri1�	remainderrrr�as_uriszTransport.as_uriN)Fr�)r,r-r.r/r0Z
can_parse_urlZpolling_intervalr7rr�Zconnection_errorsrZConnectionFailureZchannel_errorsr'Zdriver_typeZdriver_nameZ
implements�extendrlr�r�rrrrr��s&���
�r�)"r/�
__future__rr�rMrr&rrrZpymongo.cursorrZkombu.exceptionsrZkombu.utils.compatr	Zkombu.utils.encodingr
Zkombu.utils.jsonrrZkombu.utils.objectsr
Zkombu.utils.urlr�r�baserr�r�rr0r�rrrr�<module>s, /