HEX
Server: LiteSpeed
System: Linux kapuas.iixcp.rumahweb.net 5.14.0-427.42.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 1 14:58:02 EDT 2024 x86_64
User: mirz4654 (1666)
PHP: 8.1.33
Disabled: system,exec,escapeshellarg,escapeshellcmd,passthru,proc_close,proc_get_status,proc_nice,proc_open,proc_terminate,shell_exec,popen,pclose,dl,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setsid,posix_setuid,posix_setpgid,ini_alter,show_source,define_syslog_variables,symlink,syslog,openlog,openlog,closelog,ocinumcols,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dll,ftp,myshellexec,socket_bind,mail,posix_getwpuid
Upload Files
File: //usr/local/lib/python3.9/site-packages/kombu/transport/__pycache__/confluentkafka.cpython-39.pyc
a

X>hm/�@s dZddlmZddlmZddlmZddlmZddl	m
Z
ddlmZm
Z
z<ddlZdd	lmZmZmZmZdd
lmZmZdZdZWney�dZdZZYn0ddlmZee�Zd
ZGdd�de�ZGdd�dej�ZGdd�dej �Z Gdd�dej!�Z!Gdd�dej"�Z"dS)a	confluent-kafka transport module for Kombu.

Kafka transport using confluent-kafka library.

**References**

- http://docs.confluent.io/current/clients/confluent-kafka-python

**Limitations**

The confluent-kafka transport does not support PyPy environment.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: No
* Supports Priority: No
* Supports TTL: No

Connection String
=================
Connection string has the following format:

.. code-block::

    confluentkafka://[USER:PASSWORD@]KAFKA_ADDRESS[:PORT]

Transport Options
=================
* ``connection_wait_time_seconds`` - Time in seconds to wait for connection
  to succeed. Default ``5``
* ``wait_time_seconds`` - Time in seconds to wait to receive messages.
  Default ``5``
* ``security_protocol`` - Protocol used to communicate with broker.
  Visit https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for
  an explanation of valid values. Default ``plaintext``
* ``sasl_mechanism`` - SASL mechanism to use for authentication.
  Visit https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for
  an explanation of valid values.
* ``num_partitions`` - Number of partitions to create. Default ``1``
* ``replication_factor`` - Replication factor of partitions. Default ``1``
* ``topic_config`` - Topic configuration. Must be a dict whose key-value pairs
  correspond with attributes in the
  http://kafka.apache.org/documentation.html#topicconfigs.
* ``kafka_common_config`` - Configuration applied to producer, consumer and
  admin client. Must be a dict whose key-value pairs correspond with attributes
  in the https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
* ``kafka_producer_config`` - Producer configuration. Must be a dict whose
  key-value pairs correspond with attributes in the
  https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
* ``kafka_consumer_config`` - Consumer configuration. Must be a dict whose
  key-value pairs correspond with attributes in the
  https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
* ``kafka_admin_config`` - Admin client configuration. Must be a dict whose
  key-value pairs correspond with attributes in the
  https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
�)�annotations)�Empty)�virtual)�cached_property)�str_to_bytes)�dumps�loadsN)�Consumer�KafkaException�Producer�TopicPartition)�AdminClient�NewTopic�)�
get_loggeri�#c@seZdZdZdZdS)�NoBrokersAvailablez(Kafka broker is not available exception.TN)�__name__�
__module__�__qualname__�__doc__Z	retriablerrr�H/usr/local/lib/python3.9/site-packages/kombu/transport/confluentkafka.pyrZsrcs"eZdZdZd�fdd�	Z�ZS)�MessagezMessage object.Ncs(|�d�|_t�j|fd|i|��dS)N�topic�channel)�getr�super�__init__)�self�payloadr�kwargs��	__class__rrrcszMessage.__init__)N)rrrrr�
__classcell__rrr rr`src@sPeZdZdZiZdd�Zdd�Zdd�Zdd	�Zd
d�Z	dd
d�Z
ddd�ZdS)�QoSzQuality of Service guarantees.cCs|jpt|j�|jkS)z�Return true if the channel can be consumed from.

        :returns: True, if this QoS object can accept a message.
        :rtype: bool
        �Zprefetch_count�len�_not_yet_acked�rrrr�can_consumemszQoS.can_consumecCs|jr|jt|j�SdSdS)N�r$r'rrr�can_consume_max_estimatevszQoS.can_consume_max_estimatecCs||j|<dS�N�r&)r�message�delivery_tagrrr�append|sz
QoS.appendcCs
|j|Sr+r,)rr.rrrrszQoS.getcCs4||jvrdS|j�|�}|j�|j�}|��dSr+)r&�popr�
_get_consumerr�commit)rr.r-�consumerrrr�ack�s

zQoS.ackFcCsb|rT|j�|�}|j�|j�}|��D]*}t|j|j�}|�|g�\}|�	|�q&n
|�
|�dS)z�Reject a message by delivery tag.

        If requeue is True, then the last consumed message is reverted so
        it'll be refetched on the next attempt.
        If False, that message is consumed and ignored.
        N)r&r0rr1r�
assignmentr�	partition�	committed�seekr4)rr.Zrequeuer-r3r5�topic_partition�committed_offsetrrr�reject�s�z
QoS.rejectNcCsdSr+r)r�stderrrrr�restore_unacked_once�szQoS.restore_unacked_once)F)N)rrrrr&r(r*r/rr4r;r=rrrrr#hs	
r#cs�eZdZdZeZeZdZdZdZ�fdd�Z	dd�Z
dd	�Zd
d�Zdd
�Z
dd�Zdd�Zdd�Zdd�Zdd�Zdd�Zedd��Zedd��Zedd��Zed d!��Zed"d#��Zed$d%��Z�fd&d'�Z�ZS)(�ChannelzKafka Channel.�Ncs,t�j|i|��i|_i|_|��|_dSr+)rr�_kafka_consumers�_kafka_producers�_open�_client)r�argsrr rrr�szChannel.__init__cCst|��dd�S)z>Need to sanitize the name, celery sometimes pushes in @ signs.�@�)�str�replace)r�queuerrr�sanitize_queue_name�szChannel.sanitize_queue_namecCsL|�|�}|j�|d�}|durHti|j�|j�d�p8i��}||j|<|S)z9Create/get a producer instance for the given topic/queue.NZkafka_producer_config)rJrArr�
common_config�options)rrI�producerrrr�
_get_producer�s
��
zChannel._get_producercCsf|�|�}|j�|d�}|durbt|�d�ddd�|j�|j�d�pFi��}|�|g�||j|<|S)z9Create/get a consumer instance for the given topic/queue.Nz-consumer-groupZearliestF)zgroup.idzauto.offset.resetzenable.auto.commitZkafka_consumer_config)rJr@rr	rKrL�	subscribe)rrIr3rrrr1�s
���
zChannel._get_consumercKs4|�|�}|�|�}|�|tt|���|��dS)z!Put a message on the topic/queue.N)rJrNZproducerr�flush)rrIr-rrMrrr�_put�s

zChannel._putcKs~|�|�}|�|�}d}z|�|j�}Wnty:Yn0|sFt��|��}|rbt�|�t��it|�	���d|�
�i�S)z#Get a message from the topic/queue.Nr)rJr1�poll�wait_time_seconds�
StopIterationr�error�loggerr�valuer)rrIrr3r-rUrrr�_get�s


zChannel._getcOs6|�|�}|j|��|j�|�|j�|g�dS)zDelete a queue/topic.N)rJr@�closer0�clientZ
delete_topics)rrIrDrrrr�_delete�s
zChannel._deletec	Csp|�|�}|j�|d�}|dur$dSd}|��D]:}t||j�}|�|�\}}|�|g�\}|||j7}q0|S)z6Get the number of pending messages in the topic/queue.Nr)	rJr@rr5rr6Zget_watermark_offsetsr7�offset)	rrIr3�sizer5r9�_�
end_offsetr:rrr�_size�s
z
Channel._sizecKs`|�|�}||j��jvrdSt||j�dd�|j�dd�|j�di�d�}|jj|gd�dS)z(Create a new topic if it does not exist.N�num_partitionsr)�replication_factorZtopic_config)rarb�config)Z
new_topics)rJrZ�list_topics�topicsrrLrZ
create_topics)rrIrrrrr�
_new_queues
�zChannel._new_queuecKs|�|�}||j��jvS)z Check if a topic already exists.)rJrZrdre)rrIrrrr�
_has_queues
zChannel._has_queuec
Csdti|j�|j�d�pi��}z|j|jd�Wn.tjy^}zt|��WYd}~n
d}~00|S)NZkafka_admin_config)�timeout)	r
rKrLrrdrS�confluent_kafkar
r)rrZ�errrrBs��z
Channel._opencCs|jdur|��|_|jSr+)rCrBr'rrrrZ's

zChannel.clientcCs
|jjjSr+)�
connectionrZZtransport_optionsr'rrrrL-szChannel.optionscCs|jjSr+)rkrZr'rrr�conninfo1szChannel.conninfocCs|j�d|j�S)NrS)rLr�default_wait_time_secondsr'rrrrS5s�zChannel.wait_time_secondscCs|j�d|j�S)N�connection_wait_time_seconds)rLr�$default_connection_wait_time_secondsr'rrrrn;s�z$Channel.connection_wait_time_secondscCsz|jj}d|j�dt|j�pt��i}|j�dd�}|��dkr`|�	||j
|j|j�d�d��|�	|j�d�pri�|S)Nzbootstrap.servers�:�security_protocol�	plaintextZsasl_mechanism)zsecurity.protocolz
sasl.usernamez
sasl.passwordzsasl.mechanismZkafka_common_config)rkrZ�hostname�int�port�DEFAULT_PORTrLr�lower�updateZuserid�password)rrlrcrqrrrrKBs�
�zChannel.common_configcs2t���i|_|j��D]}|��qi|_dSr+)rrYrAr@�values)rr3r rrrYUs


z
Channel.close)rrrrr#rrmrorCrrJrNr1rQrXr[r`rfrgrB�propertyrZrLrlrrSrnrKrYr"rrr rr>�s<






r>csheZdZdZdddd�dd�ZeZeZdZd	Z	e
fZ�fd
d�Zdd
�Z
�fdd�Z�fdd�Z�ZS)�	TransportzKafka Transport.F�**rG)�uri�returncCsdSr+r)rr~Zinclude_password�maskrrr�as_uribszTransport.as_uriZkafkaZconfluentkafkacs(tdurtd��t�j|fi|��dS)Nz,The confluent-kafka library is not installed)ri�ImportErrorrr)rrZrr rrrpszTransport.__init__cCstjSr+)ri�__version__r'rrr�driver_versionuszTransport.driver_versioncs
t���Sr+)r�establish_connectionr'r rrr�xszTransport.establish_connectioncst��|�Sr+)r�close_connection)rrkr rrr�{szTransport.close_connection)Fr})rrrrr�r>rv�default_portZdriver_typeZdriver_namerZrecoverable_connection_errorsrr�r�r�r"rrr rr|_s�r|)#r�
__future__rrIrZkombu.transportrZkombu.utilsrZkombu.utils.encodingrZkombu.utils.jsonrrrir	r
rrZconfluent_kafka.adminr
rZKAFKA_CONNECTION_ERRORSZKAFKA_CHANNEL_ERRORSr�Z	kombu.logrrrVrvrrr#r>r|rrrr�<module>s0<7A