HEX
Server: LiteSpeed
System: Linux kapuas.iixcp.rumahweb.net 5.14.0-427.42.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 1 14:58:02 EDT 2024 x86_64
User: mirz4654 (1666)
PHP: 8.1.33
Disabled: system,exec,escapeshellarg,escapeshellcmd,passthru,proc_close,proc_get_status,proc_nice,proc_open,proc_terminate,shell_exec,popen,pclose,dl,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setsid,posix_setuid,posix_setpgid,ini_alter,show_source,define_syslog_variables,symlink,syslog,openlog,openlog,closelog,ocinumcols,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dll,ftp,myshellexec,socket_bind,mail,posix_getwpuid
Upload Files
File: //usr/local/lib/python3.9/site-packages/kombu/transport/__pycache__/gcpubsub.cpython-39.pyc
a

X>h�k�@sdZddlmZddlZddlZddlZddlZddlmZm	Z	m
Z
mZddlm
Z
ddlmZddlmZddlmZdd	lmZmZdd
lmZmZddlmZddlmZdd
lmZmZm Z ddl!m"Z"ddl#m$Z$ddl%m&Z&ddl'm(Z(m)Z)ddl'm*Z+ddl,m*Z-ddl.m*Z/ddl0m1Z2ddl3m4Z4ddl5m6Z6ddl7m8Z8m9Z9ddl:m;Z;m<Z<ddl=m>Z>ddl?m@Z@e6d�ZAeBejC�hd�ZDeEd�eEd�idd �eDD��ZFGd!d"�d"�ZGGd#d$�d$�ZHejIGd%d&�d&��ZJGd'd(�d(e@jK�ZKGd)d*�d*e@jL�ZLdS)+apGCP Pub/Sub transport module for kombu.

More information about GCP Pub/Sub:
https://cloud.google.com/pubsub

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: No
* Supports Fanout: Yes
* Supports Priority: No
* Supports TTL: No

Connection String
=================

Connection string has the following formats:

.. code-block::

    gcpubsub://projects/project-name

Transport Options
=================
* ``queue_name_prefix``: (str) Prefix for queue names.
* ``ack_deadline_seconds``: (int) The maximum time after receiving a message
  and acknowledging it before pub/sub redelivers the message.
* ``expiration_seconds``: (int) Subscriptions without any subscriber
  activity or changes made to their properties are removed after this period.
  Examples of subscriber activities include open connections,
  active pulls, or successful pushes.
* ``wait_time_seconds``: (int) The maximum time to wait for new messages.
  Defaults to 10.
* ``retry_timeout_seconds``: (int) The maximum time to wait before retrying.
* ``bulk_max_messages``: (int) The maximum number of messages to pull in bulk.
  Defaults to 32.
�)�annotationsN)�FIRST_COMPLETED�Future�ThreadPoolExecutor�wait)�suppress)�getpid)�Empty)�Lock)�	monotonic�sleep)�
NAMESPACE_OID�uuid3)�gethostname��timeout)�
AlreadyExists�DeadlineExceeded�PermissionDenied)�Retry)�
monitoring_v3)�query)�PublisherClient�SubscriberClient)�
exceptions)�
gapic_version)�TRANSIENT_DELIVERY_MODE)�
get_logger)�bytes_to_str�safe_str)�dumps�loads)�cached_property�)�virtualzkombu.transport.gcpubsub>�.�-�_r%r&cCsi|]}t|�td��qS)r')�ord)�.0�c�r+�B/usr/local/lib/python3.9/site-packages/kombu/transport/gcpubsub.py�
<dictcomp>S�r-c@sPeZdZdZdd�Zdd�Zdd�dd	�Zddd�Zd
d�Zdd�Z	dd�Z
dS)�
UnackedIdszThreadsafe list of ack_ids.cCsg|_t�|_dS�N)�_listr
�_lock��selfr+r+r,�__init__ZszUnackedIds.__init__cCs|j�|�dSr0)r1�append�r4�valr+r+r,r6^szUnackedIds.append�list)�valscCs|j�|�dSr0)r1�extend)r4r:r+r+r,r;bszUnackedIds.extend���cCs6|j�|j�|�Wd�S1s(0YdSr0)r2r1�pop)r4�indexr+r+r,r=fszUnackedIds.popc	Cs^|j�Dtt��|j�|�Wd�n1s20YWd�n1sP0YdSr0)r2r�
ValueErrorr1�remover7r+r+r,r@jszUnackedIds.removecCs4|j�t|j�Wd�S1s&0YdSr0)r2�lenr1r3r+r+r,�__len__nszUnackedIds.__len__cCs
|j|Sr0)r1)r4�itemr+r+r,�__getitem__rszUnackedIds.__getitem__N)r<)�__name__�
__module__�__qualname__�__doc__r5r6r;r=r@rBrDr+r+r+r,r/Ws
r/c@s6eZdZdZd
dd�Zddd�Zddd	�Zd
d�ZdS)�
AtomicCounterzIThreadsafe counter.

    Returns the value after inc/dec operations.
    rcCs||_t�|_dSr0)�_valuer
r2)r4�initialr+r+r,r5}szAtomicCounter.__init__r#cCs>|j�$|j|7_|jWd�S1s00YdSr0�r2rJ�r4�nr+r+r,�inc�szAtomicCounter.inccCs>|j�$|j|8_|jWd�S1s00YdSr0rLrMr+r+r,�dec�szAtomicCounter.deccCs0|j�|jWd�S1s"0YdSr0rLr3r+r+r,�get�szAtomicCounter.getN)r)r#)r#)rErFrGrHr5rOrPrQr+r+r+r,rIws



rIc@sFeZdZUdZded<ded<ded<ded<ejed�Zded	<d
S)�QueueDescriptorzPub/Sub queue descriptor.�str�name�
topic_path�subscription_id�subscription_path)�default_factoryr/�unacked_idsN)	rErFrGrH�__annotations__�dataclasses�fieldr/rYr+r+r+r,rR�s
rRcs*eZdZUdZdZdZdZdZdZdZ	dZ
dZe�Z
d	Zd
ed<e��Ze�ZiZded
<e�Zded<�fdd�Zefddd�dd�Zdd�Zdbddddd�dd�Zddd�dd�Zdcddddddd �d!d"�Zd#d$�Zd%d&�Zd'd(�Z dddd)d*�d+d,�Z!d-d.�d/d0�Z"dd)d*�d1d2�Z#dd3�d4d5�Z$de�fd6d7�	Z%ddd8�d9d:�Z&df�fd;d<�	Z'd=dd>�d?d@�Z(ddA�dBdC�Z)dDdE�Z*ddA�dFdG�Z+e,dHdI��Z-e,dJdK��Z.e,dLdM��Z/e0dNdO��Z1e0dPdQ��Z2e,dRdS��Z3e,dTdU��Z4e,dVdW��Z5e,dXdY��Z6e,dZd[��Z7e,d\d]��Z8�fd^d_�Z9e:d`da��Z;�Z<S)g�ChannelzGCP Pub/Sub channel.TF�
��i�Qi,� Nzthreading.Thread�_unacked_extenderzdict[str, QueueDescriptor]�_queue_cachezset[str]�_tmp_subscriptionscsrt�j|i|��t�|_t�d|jj�t�	|jj�|_
|j��dkrnt
j|jdd�t_|j��tj��dS)Nznew GCP pub/sub channel: %sr#T)�target�daemon)�superr5r�pool�logger�info�conninfo�hostname�	Transport�	parse_uri�
project_id�_n_channelsrO�	threading�Thread�_extend_unacked_deadliner]ra�_stop_extender�clear�start)r4�args�kwargs��	__class__r+r,r5�s�
zChannel.__init__rS)rT�returncCs(|�|j�s|j|}tt|���|�S)z7Format AMQP queue name into a valid Pub/Sub queue name.)�
startswith�queue_name_prefixrSr�	translate)r4rT�tabler+r+r,�entity_name�s
zChannel.entity_namec
Cs
|�|�j}|�|�}t�d||||�i}|dkr\dd|�d�i}|j�|j|�}|j}nn|dkr�t	t
t��dt�����}	|�d|	��}
|j�|j|
�}|j
�|�|j�|�d	}ntd
|�d���|�|j||�}|j||||d�t||||d
�}||j|<dS)Nz9binding queue: %s to %s exchange: %s with routing_key: %s�direct�filterzattributes.routing_key="�"�fanoutr%r&iXzexchange type z not implemented)rUrW�filter_args�
msg_retention)rTrUrVrW)�typeof�typerrh�debug�
subscriberrWrn�expiration_secondsrr
rrrc�add�_fanout_exchanges�NotImplementedError�
_create_topic�_create_subscriptionrRrb)
r4�exchange�routing_key�pattern�queue�
exchange_typer�rW�message_retention_duration�uidZ
uniq_sub_nameZexchange_topic�qdescr+r+r,�_queue_bind�sZ
����
����zChannel._queue_bind�int)rn�topic_idr�rzcCsx|j�||�}|�|�r(t�d|�|Sz8t�d|�d|i}|rP|�d�|d<|jj|d�WntyrYn0|S)Nztopic: %s existszcreating topic: %srT�sr���request)�	publisherrU�_is_topic_existsrhr�Zcreate_topicr)r4rnr�r�rUr�r+r+r,r�s 
��zChannel._create_topic�bool)rUrzcCs8|jjdd|j��id�}|D]}|j|krdSqdS)N�projectz	projects/r�TF)r�Zlist_topicsrnrT)r4rUZtopics�tr+r+r,r�s�
zChannel._is_topic_exists)rnr�rUrWr�rzcCs�|p|j�|j|�}|p$|j�||�}zTt�d|||�|p@|j}|jj|||j	d|j�d�i|�d�d�|ppi�d�Wnt
y�Yn0|S)Nz0creating subscription: %s, topic: %s, filter: %sZttlr�)rTZtopic�ack_deadline_secondsZexpiration_policyr�r�)r�rWrnr�rUrhr�r�Zcreate_subscriptionr�r)r4rnr�rUrWr�r�r+r+r,r�#s8
�
��
����
zChannel._create_subscriptioncOsP|�|�}t�d|�|j�|�}|s*dS|jjd|jid�|j�|d�dS)zDelete a queue by name.zdeleting queue: %sN�subscriptionr�)	rrhrirbrQr��delete_subscriptionrWr=)r4r�rvrwr�r+r+r,�_deleteKs
�zChannel._deletecKsV|�|�}|j|}|�|�}t�d||j|�t|�}|jj|j|�	d�|d�dS)zPut a message onto the queue.z8putting message to queue: %s, topic: %s, routing_key: %s�utf-8)r�N)
rrb�_get_routing_keyrhr�rUr r��publish�encode)r4r��messagerwr�r��encoded_messager+r+r,�_putWs


��zChannel._putcKsV|�||�|j�|j|�}t�d||�t|�}|jj||�d�t	|j
d�d�dS)z#Put a message onto fanout exchange.z-putting msg to fanout exchange: %s, topic: %sr���deadline)�retryN)�_lookupr�rUrnrhr�r r�r�r�retry_timeout_seconds)r4r�r�r�rwrUr�r+r+r,�_put_fanoutis�
�zChannel._put_fanout�float)r�rc	Cs�|�|�}|j|}z,|jj|jdd�t|jd�|p8|jd�}WntyXt	��Yn0t
|j�dkrnt	��|jd}|j}t
|jj�}|dd}t�d|||d�|�|d�r�t�d	|�|�|g|j�n$|||jj|jd
�|d<|j�|�|S)z(Retrieves a single message from a queue.r#�r��max_messagesr��r�r�rr�
properties�
delivery_infoz-queue:%s got message, ack_id: %s, payload: %szauto acking message ack_id: %s�r��ack_id�
message_idrW�gcpubsub_message)rrbr��pullrWrr��wait_time_secondsrr	rA�received_messagesr�r!r��datarhr��_is_auto_ack�_do_ackr�rYr6)	r4r�rr��responser�r��payloadr�r+r+r,�_getysD

�
�

��
zChannel._get�dict)�payload_propertiescCs&|dd}|d}|tkp$||jvS)Nr�r��
delivery_mode)rr�)r4r�r�r�r+r+r,r��s
�zChannel._is_auto_ackcCs:|�|�}|j|}|��}|s&t��z,|jj|j|d�t|jd�|pJ|j	d�}Wnt
yjt��Yn0|j}t|�dkr�t��g}g}	t
�dt|�|�|D]n}
|
j}tt|
jj��}|dd}
|||
jj|jd�|
d	<|�|d�r�|�|�n|j�|�|	�|�q�|�r2t
�d
|�|�||j�||	fS)z(Retrieves bulk of messages from a queue.r�r�r�rz#batching %d messages from queue: %sr�r�r�r�zauto acking ack_ids: %s)rrb�_get_max_messages_estimater	r�r�rWrr�r�rr�rArhr�r�r!rr�r�r�r�r6rYr�)r4r�rZprefixed_queuer�r�r�r�Zauto_ack_idsZret_payloadsr�r�r�r�r+r+r,�	_get_bulk�sT

�
�
��
zChannel._get_bulk)rzcCs |j��}|j}|dur|S|Sr0)�qosZcan_consume_max_estimate�bulk_max_messages)r4Zmax_allowedZmax_if_unlimitedr+r+r,r��s
z"Channel._get_max_messages_estimatecsh|jj�|i�}|s$t��|||�S|�|��|�|�|||�}|rH|St�	d|�|�
|||�|gS)Nz3no queues bound to exchange: %s, binding on the fly)�stateZ	exchangesrQrfr�r��lookupZ	get_tablerhr�Z
queue_bind)r4r�r��defaultZ
exchange_info�retrxr+r,r��s"
��zChannel._lookup)r�rzcCs�|�|�}||jvrdS|j|}tj|j|jdtj��dd�j|j	d�}t
t��"tdd�|D��Wd�S1sz0Yd	S)
z�Return the number of messages in a queue.

        This is a *rough* estimation, as Pub/Sub doesn't provide
        an exact API.
        rz;pubsub.googleapis.com/subscription/num_undelivered_messagesr#)�end_time�minutes)rVcss|]}|jdjjVqdS)rN)Zpoints�valueZint64_value)r)�contentr+r+r,�	<genexpr>sz Channel._size.<locals>.<genexpr>Nr<)
rrbrZQuery�monitorrn�datetime�nowZselect_resourcesrVrr�sum)r4r�r��resultr+r+r,�_size�s$


��
�&z
Channel._sizec	s||rtd��|j�|�j}|d}|d}|d}t�d||�|d}|�|g|�|j|}|j�	|�t
��|�dS)zAcknowledge one message.zmultiple acks not implementedr�r�r�z!ack message. queue: %s ack_id: %srWN)r�r�rQr�rhr�r�rbrYr@rf�	basic_ack)	r4Zdelivery_tag�multipler�Zpubsub_messager�r�rWr�rxr+r,r�s
zChannel.basic_ackz	list[str])�ack_idsrWcCs"|jj||d�t|jd�d�dS)N)r�r�r�)r�r�)r�Zacknowledgerr�)r4r�rWr+r+r,r�#s
�zChannel._do_ack)r�cCsH|�|�}|j�|�}|sdS|�|�}|jj|jtj��d�d�|S)z'Delete all current messages in a queue.N)r��timer�)	rrbrQr�r��seekrWr�r�)r4r�r�rNr+r+r,�_purge)s

��zChannel._purgec	Cs�t��}t�d|�|jd}t||jd�}|j�|�s�|j	�
�D]d}t|j�dkrht�
d||j�qDt�
d||jt|j�t|j��|jj|jt|j�|jd�d�qDq.t�d	|�dS)
Nz/unacked deadline extension thread: [%s] started��rz'thread [%s]: no unacked messages for %sz5thread [%s]: extend ack deadline for %s: %d msgs [%s])r�r�r�r�z.unacked deadline extension thread [%s] stopped)rp�
get_native_idrhri�_min_ack_deadline�maxr�rsrrb�valuesrArYr�rWr9r�Zmodify_ack_deadline)r4�	thread_idZmin_deadline_sleepZ
sleep_timer�r+r+r,rr9s@�
����
�z Channel._extend_unacked_deadlinecCs8|�|�}|j�|j|�}t�d||�|j�|�dS)Nz0after_reply_message_received: queue: %s, sub: %s)rr�rWrnrhr�rcr�)r4r��subr+r+r,�after_reply_message_received\s
�z$Channel.after_reply_message_receivedcCst�Sr0)rr3r+r+r,r�dszChannel.subscribercCst�Sr0)rr3r+r+r,r�hszChannel.publishercCst��Sr0)rZMetricServiceClientr3r+r+r,r�lszChannel.monitorcCs|jjSr0)�
connection�clientr3r+r+r,rjpszChannel.conninfocCs
|jjjSr0)r�r��transport_optionsr3r+r+r,r�tszChannel.transport_optionscCs|j�d|j�S)Nr�)r�rQ�default_wait_time_secondsr3r+r+r,r�xs�zChannel.wait_time_secondscCs|j�d|j�S)Nr�)r�rQ�default_retry_timeout_secondsr3r+r+r,r�~s�zChannel.retry_timeout_secondscCs|j�d|j�S)Nr�)r�rQ�default_ack_deadline_secondsr3r+r+r,r��s�zChannel.ack_deadline_secondscCs|j�dd�S)Nr|zkombu-)r�rQr3r+r+r,r|�szChannel.queue_name_prefixcCs|j�d|j�S)Nr�)r�rQ�default_expiration_secondsr3r+r+r,r��s�zChannel.expiration_secondscCs|j�d|j�S)Nr�)r�rQ�default_bulk_max_messagesr3r+r+r,r��s�zChannel.bulk_max_messagescs�t�d�|jrb|j��}tt��.t�d|�|jjd|id�Wd�q
1sV0Yq
|j�	�s�|j
��tj
��t���dS)zClose the channel.zclosing channelzdeleting subscription: %sr�r�N)rhr�rcr=r�	Exceptionr�r�rorPrs�setr]ra�joinrf�close)r4r�rxr+r,r��s


�&


z
Channel.closecCs|d�di��dd�}|S)Nr�r�r��)rQ)r�r�r+r+r,r��s���zChannel._get_routing_key)N)NNNNNN)N)N)F)=rErFrGrHZsupports_fanoutZ
do_restorer�r�r�r�r�r�r�r�rarZrp�EventrsrIrorbrcr5�CHARS_REPLACE_TABLErr�r�r�r�r�r�r�r�r�r�r�r�r�r�r�r�rrr�r"r�r�r��propertyrjr�r�r�r�r|r�r�r��staticmethodr��
__classcell__r+r+rxr,r]�s�
C��(+3#










r]cs�eZdZdZeZdZdZejj	e
jfZ	ejje
je
je
je
je
jfejfZdZdZejjjeddg�d�Z�fd	d
�Zdd�Zed
d
d�dd��Zedd
d
d�dd��Zddd�Zdd�Z dd�Z!dd�Z"�Z#S) rlzGCP Pub/Sub transport.Tg�������?ZgcpubsubZ	pubsub_v1r�r�)r�cs(t�j|fi|��t�|_t�|_dSr0)rfr5r�_poolr��_get_bulk_future_to_queue)r4r�rwrxr+r,r5�szTransport.__init__cCstjSr0)�package_version�__version__r3r+r+r,�driver_version�szTransport.driver_versionrS)�urirzcCs|�d�d}|�d�S)Nzgcpubsub://projects/r#�/)�split�strip)rr�r+r+r,rm�szTransport.parse_uriF�**cCs|pdS)Nzgcpubsub://r+)r4rZinclude_password�maskr+r+r,�as_uri�szTransport.as_uriNcCspt�}|j}|r |r ||kr |}z|j|d�Wqltyf|rVt�||krVt��|rbt|�Yq 0qlq dS)Nr)r�polling_interval�_drain_from_active_queuesr	�socket_timeoutr)r4r�rZ
time_startr
r+r+r,�drain_events�szTransport.drain_eventsc	Cs�|��|jdd�t|j|td�\}}dd�|D�}||8}|D]}|j�|d�qB|s`t��t�dt	|��|D]X}|�
�\}}|D]4}t�d|�||jvr�t�d|�q�|�
||�q�|j�|d�qtdS)	Nr^r)r�return_whencSsh|]}|��r|�qSr+)�	exception�r)�fr+r+r,�	<setcomp>r.z6Transport._drain_from_active_queues.<locals>.<setcomp>zgot %d done get_bulk tasksz consuming message from queue: %sz&Message for queue %s without consumers)�_rm_empty_bulk_requests�_submit_get_bulk_requestsrr�rr=r	rhr�rAr��
_callbacks�warningZ_deliver)	r4r�doner'�emptyrr�Zpayloadsr�r+r+r,r�s2�

�z#Transport._drain_from_active_queuescCs,dd�|jD�}|D]}|j�|d�qdS)NcSs h|]}|��r|��r|�qSr+)rrrr+r+r,rs�z4Transport._rm_empty_bulk_requests.<locals>.<setcomp>)r�r=)r4rrr+r+r,rs
�z!Transport._rm_empty_bulk_requestscCsPt|j���}|jD]6}|jD]*}||vr,q|j�|j||�}||j|<qqdSr0)r�r�r�ZchannelsZ_active_queuesr��submitr�)r4rZqueues_with_submitted_get_bulkZchannelr��futurer+r+r,r s�

z#Transport._submit_get_bulk_requests)Fr)N)$rErFrGrHr]Z
can_parse_urlr
r$rlZconnection_errors�pubsub_exceptions�TimeoutErrorZchannel_errors�publisher_exceptionsZFlowControlLimitErrorZMessageTooLargeErrorZPublishErrorZ#PublishToPausedOrderingKeyException�subscriber_exceptionsZAcknowledgeErrorZdriver_typeZdriver_nameZ
implementsr;�	frozensetr5rr�rm�classmethodr	r
rrrr�r+r+rxr,rl�sB�����
�
#	rl)MrH�
__future__rr[r��stringrp�concurrent.futuresrrrr�
contextlibr�osrr�r	r
r�rr�uuidr
r�_socketrrrZgoogle.api_core.exceptionsrrrZgoogle.api_core.retryrZgoogle.cloudrZgoogle.cloud.monitoring_v3rZgoogle.cloud.pubsub_v1rrrrZ google.cloud.pubsub_v1.publisherrZ!google.cloud.pubsub_v1.subscriberrZgoogle.pubsub_v1rrZkombu.entityrZ	kombu.logrZkombu.utils.encodingrrZkombu.utils.jsonr r!Zkombu.utils.objectsr"r�r$rhr��punctuationZPUNCTUATIONS_TO_REPLACEr(r�r/rIZ	dataclassrRr]rlr+r+r+r,�<module>sZ'��