HEX
Server: LiteSpeed
System: Linux kapuas.iixcp.rumahweb.net 5.14.0-427.42.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 1 14:58:02 EDT 2024 x86_64
User: mirz4654 (1666)
PHP: 8.1.33
Disabled: system,exec,escapeshellarg,escapeshellcmd,passthru,proc_close,proc_get_status,proc_nice,proc_open,proc_terminate,shell_exec,popen,pclose,dl,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setsid,posix_setuid,posix_setpgid,ini_alter,show_source,define_syslog_variables,symlink,syslog,openlog,openlog,closelog,ocinumcols,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dll,ftp,myshellexec,socket_bind,mail,posix_getwpuid
Upload Files
File: //proc/self/root/usr/local/lib/python3.9/site-packages/kombu/__pycache__/common.cpython-39.pyc
a

X>h�5�@s�dZddlmZddlZddlZddlZddlmZddlm	Z	ddl
mZddlm
Z
ddlmZmZmZmZdd	lmZmZd
dlmZmZd
dlmZd
d
lmZd
dlmZdZdZ ee!�Z"da#dd�Z$dd�Z%d;dd�Z&Gdd�de�Z'dd�Z(d<dd�Z)dd �Z*d!d"�Z+d#d$�Z,d=d%d&�Z-d>d'd(�Z.d?d)d*�Z/d@d+d,�Z0d-d.�Z1d/d0�Z2e	d1d2��Z3dAd3d4�Z4dBd5d6�Z5dCd7d8�Z6Gd9d:�d:�Z7dS)DzCommon Utilities.�)�annotationsN)�deque)�contextmanager)�partial)�count)�
NAMESPACE_OID�uuid3�uuid4�uuid5)�ChannelError�RecoverableConnectionError�)�Exchange�Queue)�
get_logger)�registry)�uuid)	�	Broadcast�
maybe_declarer�itermessages�
send_reply�collect_replies�insured�drain_consumer�	eventloopi��cCstdurt�jatS�N)�_node_idr	�int�rr�6/usr/local/lib/python3.9/site-packages/kombu/common.py�get_node_id"sr cCsLd�|||t|��}zttt|��}Wn tyFttt|��}Yn0|S)Nz{:x}-{:x}-{:x}-{:x})�format�id�strrr�
ValueErrorr
)Znode_id�
process_id�	thread_id�instance�ent�retrrr�generate_oid)s�r*TcCs tt�t��|rt��nd|�S�Nr)r*r �os�getpid�	threading�	get_ident)r'�threadsrrr�oid_from3s�r1cs,eZdZdZejdZd�fdd�	Z�ZS)	ra�Broadcast queue.

    Convenience class used to define broadcast queues.

    Every queue instance will have a unique name,
    and both the queue and exchange is configured with auto deletion.

    Arguments:
    ---------
        name (str): This is used as the name of the exchange.
        queue (str): By default a unique id is used for the queue
            name for every consumer.  You can specify a custom
            queue name here.
        unique (bool): Always create a unique queue
            even if a queue name is supplied.
        **kwargs (Any): See :class:`~kombu.Queue` for a list
            of additional keyword arguments supported.
    ))�queueNNFTc
sb|rd�|pdt��}n|p&dt���}t�jf|p6|||||durH|n
t|dd�d�|��dS)Nz{}.{}Zbcastzbcast.Zfanout)�type)�aliasr2�name�auto_delete�exchange)r!r�super�__init__r)�selfr5r2�uniquer6r7r4�kwargs��	__class__rrr9Rs
��zBroadcast.__init__)NNFTNN)�__name__�
__module__�__qualname__�__doc__r�attrsr9�
__classcell__rrr=rr<s
�rcCs||jjjvSr)�
connection�client�declared_entities)�entity�channelrrr�declaration_cachedisrJFcKs |rt||fi|��St||�S)zDeclare entity (cached).)�_imaybe_declare�_maybe_declare)rHrI�retry�retry_policyrrrrmsrcCs0|j}|s,|s"td|�d|����|�|�}|S)z�Make sure the channel is bound to the entity.

    :param entity: generic kombu nomenclature, generally an exchange or queue
    :param channel: channel to bind to the entity
    :return: the updated entity
    zCannot bind channel z to entity )�is_boundr�bind)rHrIrOrrr�_ensure_channel_is_boundts�
rQcCs�|}t||�|dus |jdur<|js6td|�d���|j}d}}|jrn|jrn|jjj}t|�}||vrndS|js|t	d��|j
|d�|dur�|r�|�|�|dur�|j|_dS)Nzchannel is None and entity z not bound.F�channel disconnected)rIT)
rQrErOrrIZcan_cache_declarationrFrG�hashrZdeclare�addr5)rHrI�origZdeclared�identrrrrL�s,

�

rLcKs:t||�}|jjstd��|jjjj|tfi|��||�S)NrR)rQrIrErrFZensurerL)rHrIrNrrrrK�s

���rKc
#s�t���fdd�}|g|pg|_|�Nt|jjj||dd�D]&}z���VWq>tybYq>0q>Wd�n1sz0YdS)z&Drain messages from consumer instance.cs��||f�dSr)�append)�body�message��accrr�
on_message�sz"drain_consumer.<locals>.on_messageT)�limit�timeout�ignore_timeoutsN)r�	callbacksrrIrErF�popleft�
IndexError)Zconsumerr]r^r`r\�_rrZrr�s
�
rcKs$t|jf|g|d�|��|||d�S)zIterator over messages.)�queuesrI)r]r^r`)rZConsumer)�connrIr2r]r^r`r<rrrr�s�rc	csN|rt|�pt�D]6}z|j|d�VWqtjyF|rB|sB�Yq0qdS)aBest practice generator wrapper around ``Connection.drain_events``.

    Able to drain events forever, with a limit, and optionally ignoring
    timeout errors (a timeout of 1 is often used in environments where
    the socket can get "stuck", and is a best practice for Kombu consumers).

    ``eventloop`` is a generator.

    Examples
    --------
        >>> from kombu.common import eventloop

        >>> def run(conn):
        ...     it = eventloop(conn, timeout=1, ignore_timeouts=True)
        ...     next(it)   # one event consumed, or timed out.
        ...
        ...     for _ in eventloop(conn, timeout=1, ignore_timeouts=True):
        ...         pass  # loop forever.

    It also takes an optional limit parameter, and timeout errors
    are propagated by default::

        for _ in eventloop(connection, limit=1, timeout=1):
            pass

    See Also
    --------
        :func:`itermessages`, which is an event loop bound to one or more
        consumers, that yields any messages received.
    )r^N)�rangerZdrain_events�socketr^)rer]r^r_�irrrr�src	KsH|j|f|||d�t|jd|j�d�tj|j|jd�fi|����S)a�Send reply for request.

    Arguments:
    ---------
        exchange (kombu.Exchange, str): Reply exchange
        req (~kombu.Message): Original request, a message with
            a ``reply_to`` property.
        producer (kombu.Producer): Producer instance
        retry (bool): If true must retry according to
            the ``reply_policy`` argument.
        retry_policy (Dict): Retry settings.
        **props (Any): Extra properties.
    )r7rMrNZreply_to�correlation_id)Zrouting_keyri�
serializer�content_encoding)�publish�dictZ
properties�get�serializersZtype_to_name�content_typerk)r7�req�msgZproducerrMrN�propsrrrr�s��


���rc		osv|�dd�}d}zNt|||g|�Ri|��D]\}}|s@|��d}|Vq,W|rr|�|j�n|rp|�|j�0dS)z,Generator collecting replies from ``queue``.�no_ackTFN)�
setdefaultrZackZafter_reply_message_receivedr5)	rerIr2�argsr<rtZreceivedrXrYrrrrs 
��
�rcCstjd||dd�dS)Nz#Connection error: %r. Retry in %ss
T)�exc_info)�logger�error)�exc�intervalrrr�_ensure_errbacks�r|c	cs*z
dVWn|j|jy$Yn0dSr)Zconnection_errorsZchannel_errors)rerrr�_ignore_errorss
r}cOsB|r:t|��||i|��Wd�S1s00Yt|�S)a�Ignore connection and channel errors.

    The first argument must be a connection object, or any other object
    with ``connection_error`` and ``channel_error`` attributes.

    Can be used as a function:

    .. code-block:: python

        def example(connection):
            ignore_errors(connection, consumer.channel.close)

    or as a context manager:

    .. code-block:: python

        def example(connection):
            with ignore_errors(connection):
                consumer.channel.close()


    Note:
    ----
        Connection and channel errors should be properly handled,
        and not ignored.  Using this function is only acceptable in a cleanup
        phase, like when a connection is lost or at shutdown.
    N)r})re�funrvr<rrr�
ignore_errors's
,rcCs|r||�dSrr)rErI�	on_reviverrr�revive_connectionIsr�c
Ks�|pt}|jdd��h}|j|d�|j}tt||d�}	|j||f||	d�|��}
|
|it||d���\}}|Wd�S1s�0YdS)z�Function wrapper to handle connection errors.

    Ensures function performing broker commands completes
    despite intermittent connection failures.
    T)�block)�errback)r�)r�r�)rEN)r|�acquireZensure_connectionZdefault_channelrr�Z	autoretryrm)
�poolr~rvr<r�r��optsrerIZreviver�retvalrcrrrrNs��rc@s@eZdZdZdZdd�Zddd�Zddd	�Zd
d�Zdd
�Z	dS)�QoSa�Thread safe increment/decrement of a channels prefetch_count.

    Arguments:
    ---------
        callback (Callable): Function used to set new prefetch count,
            e.g. ``consumer.qos`` or ``channel.basic_qos``.  Will be called
            with a single ``prefetch_count`` keyword argument.
        initial_value (int): Initial prefetch count value..

    Example:
    -------
        >>> from kombu import Consumer, Connection
        >>> connection = Connection('amqp://')
        >>> consumer = Consumer(connection)
        >>> qos = QoS(consumer.qos, initial_prefetch_count=2)
        >>> qos.update()  # set initial

        >>> qos.value
        2

        >>> def in_some_thread():
        ...     qos.increment_eventually()

        >>> def in_some_other_thread():
        ...     qos.decrement_eventually()

        >>> while 1:
        ...    if qos.prev != qos.value:
        ...        qos.update()  # prefetch changed so update.

    It can be used with any function supporting a ``prefetch_count`` keyword
    argument::

        >>> channel = connection.channel()
        >>> QoS(channel.basic_qos, 10)


        >>> def set_qos(prefetch_count):
        ...     print('prefetch count now: %r' % (prefetch_count,))
        >>> QoS(set_qos, 10)
    NcCs||_t��|_|pd|_dSr+)�callbackr.�RLock�_mutex�value)r:r��
initial_valuerrrr9�s
zQoS.__init__r
cCsD|j�(|jr |jt|d�|_Wd�n1s40Y|jS)z�Increment the value, but do not update the channels QoS.

        Note:
        ----
            The MainThread will be responsible for calling :meth:`update`
            when necessary.
        rN)r�r��max�r:�nrrr�increment_eventually�s0zQoS.increment_eventuallycCsP|j�4|jr,|j|8_|jdkr,d|_Wd�n1s@0Y|jS)z�Decrement the value, but do not update the channels QoS.

        Note:
        ----
            The MainThread will be responsible for calling :meth:`update`
            when necessary.
        r
N)r�r�r�rrr�decrement_eventually�s
$zQoS.decrement_eventuallycCsH||jkrD|}|tkr&t�dt�d}t�d|�|j|d�||_|S)z#Set channel prefetch_count setting.z(QoS: Disabled: prefetch_count exceeds %rrzbasic.qos: prefetch_count->%s)Zprefetch_count)�prev�PREFETCH_COUNT_MAXrx�warning�debugr�)r:Zpcount�	new_valuerrr�set�s
�zQoS.setcCs6|j�|�|j�Wd�S1s(0YdS)z)Update prefetch count with current value.N)r�r�r�)r:rrr�update�sz
QoS.update)r
)r
)
r?r@rArBr�r9r�r�r�r�rrrrr�bs*


r�)T)NF)r
NN)r
NN)NNF)NFN)N)N)NN)8rB�
__future__rr,rgr.�collectionsr�
contextlibr�	functoolsr�	itertoolsrrrrr	r
ZamqprrrHrr�logrZ
serializationrroZ
utils.uuid�__all__r�r?rxrr r*r1rrJrrQrLrKrrrrrr|r}rr�rr�rrrr�<module>sT

	-


�
	
(�


"