HEX
Server: LiteSpeed
System: Linux kapuas.iixcp.rumahweb.net 5.14.0-427.42.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 1 14:58:02 EDT 2024 x86_64
User: mirz4654 (1666)
PHP: 8.1.33
Disabled: system,exec,escapeshellarg,escapeshellcmd,passthru,proc_close,proc_get_status,proc_nice,proc_open,proc_terminate,shell_exec,popen,pclose,dl,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setsid,posix_setuid,posix_setpgid,ini_alter,show_source,define_syslog_variables,symlink,syslog,openlog,openlog,closelog,ocinumcols,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dll,ftp,myshellexec,socket_bind,mail,posix_getwpuid
Upload Files
File: //usr/local/lib/python3.9/site-packages/kombu/__pycache__/connection.cpython-39.pyc
a

X>h��@s�dZddlmZddlZddlZddlZddlmZddlm	Z	m
Z
ddlmZddl
mZmZzddlmZd	ZWney�dZd
ZYn0ddlmZdd
lmZddlmZddlmZmZddlmZddlm Z m!Z!m"Z"m#Z#ddl$m%Z%ddl&m'Z'm(Z(m)Z)m*Z*m+Z+e�rTddl,m-Z-ej.dk�r<ddl/m0Z0nddl
m0Z0ddl1m2Z2dZ3ee4�Z5e
Z6ddd�Z7e6e#d�Z8ej9�:dd
�Z;ej9�:dd
�Z<Gdd�d�Z=e=Z>Gd d!�d!e=�Z?Gd"d#�d#e�Z@Gd$d%�d%e�ZAd&d'd(�d)d*�ZBd+d,d-�d.d/�ZCdS)0zClient (Connection).�)�annotationsN)�contextmanager)�count�cycle)�
itemgetter)�
TYPE_CHECKING�Any)�	CERT_NONETF)�
exceptions�)�
get_logger)�Resource)�get_transport_cls�supports_librabbitmq)�	HashedSeq)�
dictfilter�lazy�retry_over_time�shufflecycle)�cached_property)�as_url�maybe_sanitize_url�	parse_url�quote�urlparse)�Channel)��
)�	TypeGuard)�
TracebackType)�
Connection�ConnectionPool�ChannelPool�amqp)Zpyamqp�librabbitmq)�round-robin�shuffleZKOMBU_LOG_CONNECTIONZKOMBU_LOG_CHANNELc@s�eZdZdZdZdZdZdZdZdZ	dZ
dZdZdZ
dZdZdZdZeZeZdZZZZZd�d	d
�Zdd�Zd
d�Zdd�Zdd�Zdd�Zdd�Zdd�Z d�dd�Z!dd�Z"dd�Z#d d!�Z$d"d#�Z%d$d%�Z&d�d&d'�Z'd(d)�Z(e(Z)d*d+�Z*d�d.d/�Z+e,e-j.e-j.fd0d1��Z/e,d2d3��Z0d4d5�Z1d6d7�Z2d�d9d:�Z3d�d;d<�Z4d=d>�Z5d?d@�Z6dAdB�Z7dCdD�Z8d�dEdF�Z9dGdH�Z:dIdJ�Z;ddKe<dLdMdNdOdP�fdQdR�dSdT�Z=d�dUdV�Z>d�dWdX�Z?d�dYdZ�Z@d�d[d\�ZAd�d]d^�ZBd�d_d`�ZCdadb�ZDdcdd�ZEdedf�ZFdgdh�ZGdidj�ZHdkdl�ZIdmdndodpdq�drds�ZJeKdtdu��ZLdvdw�ZMeKdxdy��ZNeKdzd{��ZOd|d}�ZPeKd~dR�dd���ZQeKd�d���ZReKd�d���ZSeTd�d���ZUd�d��ZVeTd�d���ZWeTd�d���ZXeTd�d���ZYeTd�d���ZZeKd�d���Z[eKd�d���Z\dS)�r af
A connection to the broker.

    Example:
    -------
        >>> Connection('amqp://guest:guest@localhost:5672//')
        >>> Connection('amqp://foo;amqp://bar',
        ...            failover_strategy='round-robin')
        >>> Connection('redis://', transport_options={
        ...     'visibility_timeout': 3000,
        ... })

        >>> import ssl
        >>> Connection('amqp://', login_method='EXTERNAL', ssl={
        ...    'ca_certs': '/etc/pki/tls/certs/something.crt',
        ...    'keyfile': '/etc/something/system.key',
        ...    'certfile': '/etc/something/system.cert',
        ...    'cert_reqs': ssl.CERT_REQUIRED,
        ... })

    Note:
    ----
        SSL currently only works with the py-amqp, qpid and redis
        transports.  For other transports you can use stunnel.

    Arguments:
    ---------
        URL (str, Sequence): Broker URL, or a list of URLs.

    Keyword Arguments:
    -----------------
        ssl (bool/dict): Use SSL to connect to the server.
            Default is ``False``.
            May not be supported by the specified transport.
        transport (Transport): Default transport if not specified in the URL.
        connect_timeout (float): Timeout in seconds for connecting to the
            server. May not be supported by the specified transport.
        transport_options (Dict): A dict of additional connection arguments to
            pass to alternate kombu channel implementations.  Consult the
            transport documentation for available options.
        heartbeat (float): Heartbeat interval in int/float seconds.
            Note that if heartbeats are enabled then the
            :meth:`heartbeat_check` method must be called regularly,
            around once per second.

    Note:
    ----
        The connection is established lazily when needed. If you need the
        connection to be established, then force it by calling
        :meth:`connect`::

            >>> conn = Connection('amqp://')
            >>> conn.connect()

        and always remember to close the connection::

            >>> conn.release()

    These options have been replaced by the URL argument, but are still
    supported for backwards compatibility:

    :keyword hostname: Host name/address.
        NOTE: You cannot specify both the URL argument and use the hostname
        keyword argument at the same time.
    :keyword userid: Default user name if not provided in the URL.
    :keyword password: Default password if not provided in the URL.
    :keyword virtual_host: Default virtual host if not provided in the URL.
    :keyword port: Default port if not provided in the URL.
    N�/�Fr%�	localhostrcKs�|durgn|}|||||||||	||
d�}|_|r^t|t�s^|�|�|d}|j|d�|�rd|vr�|�d�|}|d}|j|d�d|vr�d|d|�d��vr�|�dd�\|d<|d	<|d|_nJd|v�r|p�t|�j	}t
|�j�st|�}|jt
|�|d	d�||d<|jfi|��||_|�p8d
|_|j�|j��pP|j|_|j�rt|�|j�|_t|j�|
du�r�i}
|
|_t�r�d|_|�r�||_t�|_dS)N)�hostname�userid�password�virtual_host�port�insist�ssl�	transport�connect_timeout�login_method�	heartbeatr)r*�;�://�+rr1r*r%T)�_initial_params�
isinstance�str�extend�update�split�index�
uri_prefixr�schemer�
can_parse_urlrr�_init_params�alt�_failover_strategy�failover_strategies�get�failover_strategyr�next�transport_options�_log_connection�_logger�set�declared_entities)�selfr*r+r,r-r.r/r0r1r2rIr3r?r4rG�
alternates�kwargsrC�paramsZ
url_params�rR�:/usr/local/lib/python3.9/site-packages/kombu/connection.py�__init__�s`�

�
���

zConnection.__init__cCsR|��|j��d|_d|vr(t|�nd|i}|jfit|jfi|����dS)z�Switch connection parameters to use a new URL or hostname.

        Note:
        ----
            Does not reconnect!

        Arguments:
        ---------
            conn_str (str): either a hostname or URL.
        Fr6r*N)�closerM�clear�_closedrrB�dictr8)rNZconn_strZconn_paramsrRrRrS�switch�s
�zConnection.switchcCs|jr|�t|j��dS)z:Switch to next URL given by the current failover strategy.N)rrYrH�rNrRrRrS�maybe_switch_nextszConnection.maybe_switch_nextcCs�|pd}|dkrt�rd}|dkr<tr<|s<t�d�dti}||_||_||_|
|_|p\|j	|_	|ph|j
|_
||_|	|_||_
||_|o�t|�|_dS)Nr#r$ZredisszaSecure redis scheme specified (rediss) with no ssl options, defaulting to insecure SSL behaviour.Z
ssl_cert_reqs)r�
ssl_available�logger�warningr	r*r+r,r3r-r.r/r2r0�
transport_cls�floatr4)rNr*r+r,r-r.r/r0r1r2r3r4rRrRrSrBs&�zConnection._init_paramscCs|j�|j|�dS�N)r1�register_with_event_loop�
connection)rN�looprRrRrSrbsz#Connection.register_with_event_loopcOs8|jr4d}tj|jt|�t|�d�g|�Ri|��dS)Nz [Kombu connection:{id:#x}] {msg})�id�msg)rKr]�debug�formatrer:)rNrf�argsrP�fmtrRrRrS�_debug s��zConnection._debugcCs|jddd�S)z+Establish connection to server immediately.rF��max_retries�reraise_as_library_errors��_ensure_connectionrZrRrRrS�connect&s�zConnection.connectcCs8|�d�|j�|j�}tr4ddlm}||dd�S|S)z Create and return a new channel.zcreate channelr)�
Logwrappedz
kombu.channelz[Kombu channel:{0.channel_id}] )rkr1Zcreate_channelrc�_log_channelZutils.debugrr)rNZchanrrrRrRrS�channel,s
�zConnection.channel�cCs|jj|j|d�S)a�Check heartbeats.

        Allow the transport to perform any periodic tasks
        required to make heartbeats work.  This should be called
        approximately every second.

        If the current transport does not support heartbeats then
        this is a noop operation.

        Arguments:
        ---------
            rate (int): Rate is how often the tick is called
                compared to the actual heartbeat value.  E.g. if
                the heartbeat is set to 3 seconds, and the tick
                is called every 3 / 2 seconds, then the rate is 2.
                This value is currently unused by any transports.
        )�rate)r1�heartbeat_checkrc)rNrvrRrRrSrw6szConnection.heartbeat_checkcKs|jj|jfi|��S)z�Wait for a single event from the server.

        Arguments:
        ---------
            timeout (float): Timeout in seconds before we give up.

        Raises
        ------
            socket.timeout: if the timeout is exceeded.
        )r1�drain_eventsrc�rNrPrRrRrSrxJszConnection.drain_eventsc	Cs,z|��Wn|j|jy&Yn0dS)z>Close given channel, but ignore connection and channel errors.N)rU�connection_errors�channel_errors�rNrtrRrRrS�maybe_close_channelWszConnection.maybe_close_channelc
Cs^|j��|jr|�|j�|jrZz|j�|j�Wn|jtt	j
fyRYn0d|_dSra)rMrV�_default_channelr}�_connectionr1Zclose_connectionrz�AttributeError�socket�errorrZrRrRrS�_do_close_self^s
zConnection._do_close_selfcCs$|��|��|�d�d|_dS)z;Really close connection, even if part of a connection pool.�closedTN)r��_do_close_transportrkrWrZrRrRrS�_closejs
zConnection._closecCs|jrd|j_d|_dSra)�
_transport�clientrZrRrRrSr�qszConnection._do_close_transportcCs�z|jj}Wnbtynt��}t�|�z0z|��WntjyNYn0Wt�|�nt�|�0Yn0||j�|�	�|j
��d|_dSra)r�Z_collectr�r��getdefaulttimeout�setdefaulttimeoutr��timeoutrr�rMrV)rN�socket_timeoutZgc_transportZ_timeorRrRrS�collectvs


zConnection.collectcCs|��dS)zClose the connection (if open).N�r�rZrRrRrS�release�szConnection.releasecOs|j|i|��|S)ztPublic interface of _ensure_connection for retro-compatibility.

        Returns kombu.Connection instance.
        ro�rNrirPrRrRrS�ensure_connection�szConnection.ensure_connection�Tc	sx�jr�jSd��fdd�	}	�j}
|s,�j}
|
��2t�j�jdi|	||||||d�Wd�S1sj0YdS)a�Ensure we have a connection to the server.

        If not retry establishing the connection with the settings
        specified.

        Arguments:
        ---------
            errback (Callable): Optional callback called each time the
                connection can't be established.  Arguments provided are
                the exception raised and the interval that will be
                slept ``(exc, interval)``.

            max_retries (int): Maximum number of times to retry.
                If this limit is exceeded the connection error
                will be re-raised.

            interval_start (float): The number of seconds we start
                sleeping for.
            interval_step (float): How many seconds added to the interval
                for each retry.
            interval_max (float): Maximum number of seconds to sleep between
                each retry.
            callback (Callable): Optional callback that is called for every
                internal iteration (1 s).
            timeout (int): Maximum amount of time in seconds to spend
                attempting to connect, total over all retries.
        rcs8��|�}|rt|�}�r$�||����|r4|SdS�Nr)�completes_cyclerHr[)�excZ	intervals�retries�interval�round��errbackrNrRrS�on_error�s

z/Connection._ensure_connection.<locals>.on_errorrR)r�N)r)�	connectedr�_reraise_as_library_errors�_dummy_contextr�_connection_factory�recoverable_connection_errors)rNr�rm�interval_start�
interval_step�interval_max�callbackrnr�r��ctxrRr�rSrp�s!
�zConnection._ensure_connectionc
cs�z
dVWn|||fy"�Ynf|jyT}z|t|��|�WYd}~n<d}~0|jy�}z|t|��|�WYd}~n
d}~00dSra)r�r:�recoverable_channel_errors)rN�ConnectionErrorZChannelErrorr�rRrRrSr��s
"z%Connection._reraise_as_library_errorsccs
dVdSrarRrZrRrRrSr��szConnection._dummy_contextcCs|jr|dt|j�SdS)z?Return true if the cycle is complete after number of `retries`.rT)rC�len)rNr�rRrRrSr��szConnection.completes_cyclecCs&|jr"||jur"|�|j�d|_dS)z2Revive connection after connection re-established.N)r~r})rNZnew_channelrRrRrS�revive�szConnection.reviverc

sN�durt������������	f
dd�}
�j�d�|
_�j|
_�j|
_|
S)a.Ensure operation completes.

        Regardless of any channel/connection errors occurring.

        Retries by establishing the connection, and reapplying
        the function.

        Arguments:
        ---------
            obj: The object to ensure an action on.
            fun (Callable): Method to apply.

            errback (Callable): Optional callback called each time the
                connection can't be established.  Arguments provided are
                the exception raised and the interval that will
                be slept ``(exc, interval)``.

            max_retries (int): Maximum number of times to retry.
                If this limit is exceeded the connection error
                will be re-raised.

            interval_start (float): The number of seconds we start
                sleeping for.
            interval_step (float): How many seconds added to the interval
                for each retry.
            interval_max (float): Maximum number of seconds to sleep between
                each retry.
            on_revive (Callable): Optional callback called whenever
                revival completes successfully
            retry_errors (tuple): Optional list of errors to retry on
                regardless of the connection state.

        Examples
        --------
            >>> from kombu import Connection, Producer
            >>> conn = Connection('amqp://')
            >>> producer = Producer(conn)

            >>> def errback(exc, interval):
            ...     logger.error('Error: %r', exc, exc_info=1)
            ...     logger.info('Retry in %s seconds.', interval)

            >>> publish = conn.ensure(producer, producer.publish,
            ...                       errback=errback, max_retries=3)
            >>> publish({'hello': 'world'}, routing_key='dest')
        Nc

s�d}�	j}�	j}t�	jd�}�	�����td�D�]�}z"�|i|��WWd�S�y�}z.�durz|�krz��	jd|dd�WYd}~q0d}~0|�y\}z�|r�|s���dur�|�krʂ�	jd|dd��	���o�|d�d}�du�rt�|d�}�	j	�|���dd��	j
}	��|	���r@�|	�|d7}WYd}~q0d}~0|�y�}zB�du�r�|�k�r���	jd	|dd���o��|d�WYd}~q0d}~00q0Wd�n1�s�0YdS)
Nrr�zensure retry policy error: %rr)�exc_infozensure connection error: %rF)rnzensure channel error: %r)r�r��hasattrr1r�rrkr��maxrp�default_channelr�)
rirPZgot_connectionZconn_errorsZchan_errorsZhas_modern_errorsr�r�Zremaining_retriesrt�
r��funr�r�r�rm�obj�	on_revive�retry_errorsrNrRrS�_ensured"s^�"��
�
�z#Connection.ensure.<locals>._ensuredz	(ensured))�tuple�__name__�__doc__�
__module__)rNr�r�r�rmr�r�r�r�r�r�rRr�rS�ensure�s11zConnection.ensurecs6|g�G��fdd�d�}||�}|j||fi|��S)aDecorator for functions supporting a ``channel`` keyword argument.

        The resulting callable will retry calling the function if
        it raises connection or channel related errors.
        The return value will be a tuple of ``(retval, last_created_channel)``.

        If a ``channel`` is not provided, then one will be automatically
        acquired (remember to close it afterwards).

        See Also
        --------
            :meth:`ensure` for the full list of supported keyword arguments.

        Example:
        -------
            >>> channel = connection.channel()
            >>> try:
            ...    ret, channel = connection.autoretry(
            ...         publish_messages, channel)
            ... finally:
            ...    channel.close()
        csReZdZe�dd�Ze�dd�Ze�dd�Zdd�Z�fdd�Z��fd	d
�ZdS)z%Connection.autoretry.<locals>.Revivalr�Nr�r�cSs
||_dSra)rc)rNrcrRrRrSrTvsz.Connection.autoretry.<locals>.Revival.__init__cs|�d<dSr�rRr|)�channelsrRrSr�ysz,Connection.autoretry.<locals>.Revival.revivecs8�ddur|�|jj��|d�di|���dfS)Nrrt)r�rcr�r��r�r�rRrS�__call__|sz.Connection.autoretry.<locals>.Revival.__call__)r�r��__qualname__�getattrr�rTr�r�rRr�rRrS�Revivalqsr�)r�)rNr�rtZensure_optionsr�r�rRr�rS�	autoretryXszConnection.autoretrycCs|��|d�S)N)r�)rrZrRrRrS�create_transport�szConnection.create_transportcCs |j}|rt|t�rt|�}|S)z'Get the currently used transport class.)r_r9r:r)rNr_rRrRrSr�szConnection.get_transport_clscKs$|jfit|jdd�fi|����S)z3Create a copy of the connection with same settings.F��resolve)�	__class__rX�_inforyrRrRrS�clone�szConnection.clonecCs|j�|j�Sra)r1�get_heartbeat_intervalrcrZrRrRrSr��sz!Connection.get_heartbeat_intervalcCs|j}|r|j�||�}|jj}|jsT|�d�rTt�d|�d��d��|�d�}n|j}|jrp|j�d|��}d|fd|j	p�|�d�fd|j
p�|�d�fd|jp�|�d�fd|jp�|�d�fd	|j
fd
|jfd|fd|jfd
|jfd|jp�|�d�fd|jfd|jfd|jfd|jff}|S)Nr*z0No hostname was supplied. Reverting to default '�'r7r+r,r-r.r/r0r1r2rIr3r?r4rGrO)r_�resolve_aliasesrFr1Zdefault_connection_paramsr*r]r^r?r+r,r-r.r/r0r2rIr3r4rDrC)rNr�r_�Dr*�inforRrRrSr��sB����zConnection._infocCst|���S)zGet connection info.)rXr�rZrRrRrSr��szConnection.infoc	Cs&t|j|j|j|j|j|jt|j��Sra)	rr_r*r+r,r-r.�reprrIrZrRrRrS�
__eqhash__�s�zConnection.__eqhash__z**r.r+r,r-r1r:)�returnc
Cs�|jpd}|jjrd|j}z|j�|||�WSty<Yn0|jrT|j�d|��}|s`t|�}|S|jr�|j�d|��}|s�t|�}|S|��}||�\}}}	}
}t|||||	t	|
�||d�S)z*Convert connection parameters to URL form.r)r7)�sanitize�mask)
r*r1rA�as_uri�NotImplementedErrorr?rr�rr)rNZinclude_passwordr�Z	getfieldsr*Zconnection_as_uri�fieldsr.r+r,Zvhostr1rRrRrSr��s2
��zConnection.as_uricKst||fi|��S)a/Pool of connections.

        See Also
        --------
            :class:`ConnectionPool`.

        Arguments:
        ---------
            limit (int): Maximum number of active connections.
                Default is no limit.

        Example:
        -------
            >>> connection = Connection('amqp://')
            >>> pool = connection.Pool(2)
            >>> c1 = pool.acquire()
            >>> c2 = pool.acquire()
            >>> c3 = pool.acquire()
            Traceback (most recent call last):
              File "<stdin>", line 1, in <module>
              File "kombu/connection.py", line 354, in acquire
              raise ConnectionLimitExceeded(self.limit)
                kombu.exceptions.ConnectionLimitExceeded: 2
            >>> c1.release()
            >>> c3 = pool.acquire()
        )r!�rN�limitrPrRrRrS�Pool�szConnection.PoolcKst||fi|��S)a'Pool of channels.

        See Also
        --------
            :class:`ChannelPool`.

        Arguments:
        ---------
            limit (int): Maximum number of active channels.
                Default is no limit.

        Example:
        -------
            >>> connection = Connection('amqp://')
            >>> pool = connection.ChannelPool(2)
            >>> c1 = pool.acquire()
            >>> c2 = pool.acquire()
            >>> c3 = pool.acquire()
            Traceback (most recent call last):
              File "<stdin>", line 1, in <module>
              File "kombu/connection.py", line 354, in acquire
              raise ChannelLimitExceeded(self.limit)
                kombu.connection.ChannelLimitExceeded: 2
            >>> c1.release()
            >>> c3 = pool.acquire()
        )r"r�rRrRrSr"�szConnection.ChannelPoolcOs&ddlm}||p|g|�Ri|��S)z,Create new :class:`kombu.Producer` instance.r)�Producer)�	messagingr�)rNrtrirPr�rRrRrSr�szConnection.ProducercOs(ddlm}||p||g|�Ri|��S)z,Create new :class:`kombu.Consumer` instance.r)�Consumer)r�r�)rN�queuesrtrirPr�rRrRrSr�!szConnection.Consumerc	Ks*ddlm}||p||||||fi|��S)a�Simple persistent queue API.

        Create new :class:`~kombu.simple.SimpleQueue`, using a channel
        from this connection.

        If ``name`` is a string, a queue and exchange will be automatically
        created using that name as the name of the queue and exchange,
        also it will be used as the default routing key.

        Arguments:
        ---------
            name (str, kombu.Queue): Name of the queue/or a queue.
            no_ack (bool): Disable acknowledgments. Default is false.
            queue_opts (Dict): Additional keyword arguments passed to the
                constructor of the automatically created :class:`~kombu.Queue`.
            queue_args (Dict): Additional keyword arguments passed to the
                constructor of the automatically created :class:`~kombu.Queue`
                for setting implementation extensions (e.g., in RabbitMQ).
            exchange_opts (Dict): Additional keyword arguments passed to the
                constructor of the automatically created
                :class:`~kombu.Exchange`.
            channel (ChannelT): Custom channel to use. If not specified the
                connection default channel is used.
        r)�SimpleQueue)�simpler�)	rN�name�no_ack�
queue_opts�
queue_args�
exchange_optsrtrPr�rRrRrSr�&s��zConnection.SimpleQueuec	Ks*ddlm}||p||||||fi|��S)a�Simple ephemeral queue API.

        Create new :class:`~kombu.simple.SimpleQueue` using a channel
        from this connection.

        See Also
        --------
            Same as :meth:`SimpleQueue`, but configured with buffering
            semantics. The resulting queue and exchange will not be durable,
            also auto delete is enabled. Messages will be transient (not
            persistent), and acknowledgments are disabled (``no_ack``).
        r)�SimpleBuffer)r�r�)	rNr�r�r�r�r�rtrPr�rRrRrSr�Fs��zConnection.SimpleBuffercCs$|�d�|j��}|�d|�|S)Nzestablishing connection...zconnection established: %r)rkr1Zestablish_connection)rN�connrRrRrS�_establish_connectionZs

z Connection._establish_connectioncCs||jjjvSra)r1�
implements�
exchange_type)rNr�rRrRrS�supports_exchange_type`sz!Connection.supports_exchange_typecCsd|���dt|�d�d�S)Nz
<Connection: z at z#x�>)r�rerZrRrRrS�__repr__cszConnection.__repr__cCs|��Sra)r�rZrRrRrS�__copy__fszConnection.__copy__cCs|jt|�����dfSra)r�r�r��valuesrZrRrRrS�
__reduce__iszConnection.__reduce__cCs|SrarRrZrRrRrS�	__enter__lszConnection.__enter__ztype[BaseException] | NonezBaseException | NonezTracebackType | None�None)�exc_type�exc_val�exc_tbr�cCs|��dSra)r��rNr�r�r�rRrRrS�__exit__oszConnection.__exit__cCs|j�|j�Sra)r1�qos_semantics_matches_specrcrZrRrRrSr�wsz%Connection.qos_semantics_matches_speccCs�d|ji}|j}|r�d|vr(|d|d<d|vr<|d|d<d|vrP|d|d<d|vrd|d|d<d|vrx|d|d<d|vr�|d|d<d|vr�|d|d<|S)	Nr�rmr�r�r�Zconnect_retries_timeoutr�r�)r2rI)rN�	conn_optsZtransport_optsrRrRrS�_extract_failover_opts{s&
�z!Connection._extract_failover_optscCs |jo|jduo|j�|j�S)z3Return true if the connection has been established.N)rWrr1Zverify_connectionrZrRrRrSr��s
��zConnection.connectedcCs$|js |js|jddd�S|jSdS)z�The underlying connection object.

        Warning:
        -------
            This instance is transport specific, so do not
            depend on the interface of this object.
        rFrlN)rWr�rprrZrRrRrSrc�s	�zConnection.connectioncCs&|j��d|_|��|_d|_|jS)NF)rMrVr~r�rrWrZrRrRrSr��s


zConnection._connection_factoryrcCs2|��}|jfi|��|jdur,|��|_|jS)awDefault channel.

        Created upon access and closed when the connection is closed.

        Note:
        ----
            Can be used for automatic channel handling when you only need one
            channel, and also it is the channel implicitly used if
            a connection is passed instead of a channel, to functions that
            require a channel.
        N)r�rpr~rt)rNr�rRrRrSr��s


zConnection.default_channelcCsd�|jt|j�g�S)z5The host as a host name/port pair separated by colon.�:)�joinr*r:r.rZrRrRrS�host�szConnection.hostcCs|jdur|��|_|jSra)r�r�rZrRrRrSr1�s

zConnection.transportcCs|jjS)z�AMQP Management API.

        Experimental manager that can be used to manage/monitor the broker
        instance.

        Not available for all transports.
        )r1�managerrZrRrRrSr��s	zConnection.managercOs|jj|i|��Sra)r1�get_managerr�rRrRrSr��szConnection.get_managercCs0z|��jWSty*|j|jYS0dS)z�Recoverable connection errors.

        List of connection related exceptions that can be recovered from,
        but where the connection must be closed and re-established first.
        N)rr�r�rzr{rZrRrRrSr��sz(Connection.recoverable_connection_errorscCs&z|��jWSty YdS0dS)z�Recoverable channel errors.

        List of channel related exceptions that can be automatically
        recovered from without re-establishing the connection.
        rRN)rr�r�rZrRrRrSr��sz%Connection.recoverable_channel_errorscCs
|��jS)z8List of exceptions that may be raised by the connection.)rrzrZrRrRrSrz�szConnection.connection_errorscCs
|��jS)z5List of exceptions that may be raised by the channel.)rr{rZrRrRrSr{�szConnection.channel_errorscCs
|jjjSra)r1r�Z
heartbeatsrZrRrRrS�supports_heartbeatsszConnection.supports_heartbeatscCs
|jjjSra)r1r�ZasynchronousrZrRrRrS�
is_eventedszConnection.is_evented)r)NNNNFFNr(NNNrr%N)ru)N)NNrurur�NTN)NNrrrNN)N)T)N)N)N)NN)NNNNN)NNNNN)]r�r�r�r�r.r-r2rWrr~r�rKr?rMrrIrGr4r�rEr*r+r,r0r3rTrYr[rBrbrkrqrtrwrxr}r�r�r�r�r�rUr�rprr
ZOperationalErrorr�r�r�r�r�r�r�rr�r�r�r�r�rr�r�r"r�r�r�r�r�r�r�r�r�r�r��propertyr�r�r�rcr�r�r�r1rr�r�r�r�rzr{r�r�rRrRrRrSr ?s�E�
D



	�
9�

�
j
,
%��



�
 �












r cs4eZdZdZ�fdd�Zdd�Z�fdd�Z�ZS)�PooledConnectionz�Wraps :class:`kombu.Connection`.

    This wrapper modifies :meth:`kombu.Connection.__exit__` to close the connection
    in case any exception occurred while the context was active.
    cs||_t�jfi|��dSra)�_pool�superrT)rN�poolrP�r�rRrSrTszPooledConnection.__init__cCs|SrarRrZrRrRrSr�szPooledConnection.__enter__cs,|dur|jjr|j�|�t��|||�Sra)r�r��replacerr�r�rrRrSr�szPooledConnection.__exit__)r�r�r�r�rTr�r��
__classcell__rRrRrrSr�sr�csleZdZdZejZdZd�fdd�	Zdd�Z	dd	�Z
d
d�Zdd
d�Ze
ddd��Zdd�Zdd�Z�ZS)r!zPool of connections.TNcs||_t�j|d�dS�N)r��rcrrT�rNrcr�rPrrRrSrT(szConnectionPool.__init__cCst|fit|jjdd����S)NFr�)r�rXrcr�rZrRrRrS�new,szConnectionPool.newcCs&z|�d�Wnty Yn0dS)NZreleased)rkr��rN�resourcerRrRrS�release_resource/szConnectionPool.release_resourcecCs|��dSrar�r	rRrRrS�close_resource5szConnectionPool.close_resource皙�����?cCst|t�s|�|�SdSra)r9rr�)rNr
r�rRrRrS�collect_resource8s
zConnectionPool.collect_resourceFccs<|j|d��}||jfVWd�n1s.0YdS)N)�block)�acquirer�)rNrrcrRrRrS�acquire_channel<szConnectionPool.acquire_channelcCs>|jr:|jj}t|�|jt|j�kr:|j�t|j��qdSra)r��	_resource�queuer��_dirty�
put_nowaitrr)rN�qrRrRrS�setupAszConnectionPool.setupcCst|�r|�}|�d�|S)NZacquired)�callablerkr	rRrRrS�prepareHs
zConnectionPool.prepare)N)r
)F)r�r�r�r�r
ZConnectionLimitExceeded�
LimitExceededZclose_after_forkrTrrrrrrrrrrRrRrrSr!"s
r!cs@eZdZdZejZd�fdd�	Zdd�Zdd�Z	d	d
�Z
�ZS)r"zPool of channels.Ncs||_t�j|d�dSrrrrrRrSrTTszChannelPool.__init__cCst|jj�Sra)rrcrtrZrRrRrSrXszChannelPool.newcCsD|��}|jr@|jj}t|�|jt|j�kr@|j�t|��qdSra)rr�rrr�rrr)rNrtrrRrRrSr[s
zChannelPool.setupcCst|�r|�}|Sra)rr|rRrRrSrcszChannelPool.prepare)N)r�r�r�r�r
ZChannelLimitExceededrrTrrrrrRrRrrSr"Osr"zChannel | Connectionr)rtr�cCst|�r|jS|S)z�Get channel from object.

    Return the default channel if argument is a connection instance,
    otherwise just return the channel given.
    )�
is_connectionr�)rtrRrRrS�
maybe_channelisrrzTypeGuard[Connection])r�r�cCs
t|t�Sra)r9r )r�rRrRrSrtsr)Dr��
__future__r�osr��sys�
contextlibr�	itertoolsrr�operatorr�typingrrr0r	r\�ImportErrorZkombur
�logrr
r
r1rrZutils.collectionsrZutils.functionalrrrrZ
utils.objectsrZ	utils.urlrrrrrZkombu.transport.virtualr�version_infoZtyping_extensionsr�typesr�__all__r�r]Zroundrobin_failoverr�rE�environrFrJrsr ZBrokerConnectionr�r!r"rrrRrRrRrS�<module>sj
��S-