HEX
Server: LiteSpeed
System: Linux kapuas.iixcp.rumahweb.net 5.14.0-427.42.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 1 14:58:02 EDT 2024 x86_64
User: mirz4654 (1666)
PHP: 8.1.33
Disabled: system,exec,escapeshellarg,escapeshellcmd,passthru,proc_close,proc_get_status,proc_nice,proc_open,proc_terminate,shell_exec,popen,pclose,dl,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setsid,posix_setuid,posix_setpgid,ini_alter,show_source,define_syslog_variables,symlink,syslog,openlog,openlog,closelog,ocinumcols,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dll,ftp,myshellexec,socket_bind,mail,posix_getwpuid
Upload Files
File: //usr/local/lib/python3.9/site-packages/kombu/transport/__pycache__/filesystem.cpython-39.pyc
a

X>h�(�@szdZddlmZddlZddlZddlZddlZddlmZddl	m
Z
ddlmZddl
mZddlmZdd	lmZdd
lmZmZddlmZmZddlmZd
Zd�eee��Zej dk�rddl!Z!ddl"Z"ddl#Z#e"j$Z%dZ&e"j'Z(e!�)�Z*dd�Z+dd�Z,n>ej dk�r<ddl-Z-ddl-m%Z%m&Z&dd�Z+dd�Z,ne.d��edgd��Z/Gdd�dej0�Z0Gdd�dej1�Z1dS)a�	File-system Transport module for kombu.

Transport using the file-system as the message store. Messages written to the
queue are stored in `data_folder_in` directory and
messages read from the queue are read from `data_folder_out` directory. Both
directories must be created manually. Simple example:

* Producer:

.. code-block:: python

    import kombu

    conn = kombu.Connection(
        'filesystem://', transport_options={
            'data_folder_in': 'data_in', 'data_folder_out': 'data_out'
        }
    )
    conn.connect()

    test_queue = kombu.Queue('test', routing_key='test')

    with conn as conn:
        with conn.default_channel as channel:
            producer = kombu.Producer(channel)
            producer.publish(
                        {'hello': 'world'},
                        retry=True,
                        exchange=test_queue.exchange,
                        routing_key=test_queue.routing_key,
                        declare=[test_queue],
                        serializer='pickle'
            )

* Consumer:

.. code-block:: python

    import kombu

    conn = kombu.Connection(
        'filesystem://', transport_options={
            'data_folder_in': 'data_out', 'data_folder_out': 'data_in'
        }
    )
    conn.connect()

    def callback(body, message):
        print(body, message)
        message.ack()

    test_queue = kombu.Queue('test', routing_key='test')

    with conn as conn:
        with conn.default_channel as channel:
            consumer = kombu.Consumer(
                conn, [test_queue], accept=['pickle']
            )
            consumer.register_callback(callback)
            with consumer:
                conn.drain_events(timeout=1)

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: No
* Supports TTL: No

Connection String
=================
Connection string is in the following format:

.. code-block::

    filesystem://

Transport Options
=================
* ``data_folder_in`` - directory where are messages stored when written
  to queue.
* ``data_folder_out`` - directory from which are messages read when read from
  queue.
* ``store_processed`` - if set to True, all processed messages are backed up to
  ``processed_folder``.
* ``processed_folder`` - directory where are backed up processed files.
* ``control_folder`` - directory where are exchange-queue table stored.
�)�annotationsN)�
namedtuple)�Path)�Empty)�	monotonic)�ChannelError)�virtual)�bytes_to_str�str_to_bytes)�dumps�loads)�cached_property)�rr�.�ntcCs$t�|���}t�||ddt�dS)�Create file lock.r��N)�	win32file�_get_osfhandle�filenoZ
LockFileEx�__overlapped)�file�flags�hfile�r�D/usr/local/lib/python3.9/site-packages/kombu/transport/filesystem.py�lock}srcCs"t�|���}t�|ddt�dS)�Remove file lock.rrN)rrrZUnlockFileExr)rrrrr�unlock�sr�posix)�LOCK_EX�LOCK_SHcCst�|��|�dS)rN)�fcntl�flockr)rrrrrr�scCst�|��tj�dS)rN)r"r#rZLOCK_UN)rrrrr�sz9Filesystem plugin only defined for NT and POSIX platforms�exchange_queue_t)�routing_key�pattern�queuec@s�eZdZdZdZdd�Zdd�Zdd�Zd	d
�Zdd�Z	d
d�Z
dd�Zedd��Z
edd��Zedd��Zedd��Zedd��Zedd��ZdS)�ChannelzFilesystem Channel.TcCs�|j|�d�}z^|�d�}z<t|t�tt|����}dd�|D�Wt|�|��WSt|�|��0Wn4t	y�gYSt
y�td|����Yn0dS)N�	.exchange�rcSsg|]}t|��qSr�r$��.0�qrrr�
<listcomp>��z%Channel.get_table.<locals>.<listcomp>zCannot open )�control_folder�openrr!rr	�readr�close�FileNotFoundError�OSErrorr)�self�exchanger�f_obj�exchange_tablerrr�	get_table�s$

��zChannel.get_tablec
Cs|j|�d�}|jjdd�t|p&d|p,d|p2d�}z�|��r�|jddd�}t|t�tt|�	���}dd	�|D�}	||	vr�|	�
d|�|�d�|�t
t|	���n0|jd
dd�}t|t�|g}	|�t
t|	���Wt|�|��nt|�|��0dS)Nr)T)�exist_ok�zrb+r��	bufferingcSsg|]}t|��qSrr+r,rrrr/�r0z'Channel._queue_bind.<locals>.<listcomp>�wb)r1�mkdirr$�existsr2rr rr	r3�insert�seek�writer
rrr4)
r7r8r%r&r'rZ	queue_valr9r:�queuesrrr�_queue_bind�s.�



�zChannel._queue_bindcKs*|�|�D]}|j|j|fi|��q
dS�N)r;�_putr')r7r8�payloadr%�kwargsr.rrr�_put_fanout�szChannel._put_fanoutcKs�d�ttt�d��t��|�}tj�|j	|�}zfz.t
|ddd�}t|t�|�
tt|���Wn"ty�td|�d���Yn0Wt|�|��nt|�|��0dS)	zPut `message` onto `queue`.z{}_{}.{}.msgi�r@rr>zCannot add file z
 to directoryN)�format�int�roundr�uuid�uuid4�os�path�join�data_folder_outr2rr rEr
rr6rrr4)r7r'rJrK�filename�frrrrI�s �

�
�zChannel._putcCsd|d}t�|j�}t|�}t|�dkr�|�d�}|�|�dkrFq |jrT|j}nt	�
�}zt�tj
�|j|�|�Wnty�Yq Yn0tj
�||�}z.t|d�}|��}|��|js�t�|�Wn"ty�td|�d���Yn0tt|��St��dS)zGet next message from `queue`.r�.msgr�rbzCannot read file z from queue.N)rR�listdir�data_folder_in�sorted�len�pop�find�store_processed�processed_folder�tempfile�
gettempdir�shutil�moverSrTr6r2r3r4�removerrr	r)r7r'�
queue_find�folderrVrarWrJrrr�_get�s:
�


�
zChannel._getcCs�d}d|d}t�|j�}t|�dkr~|��}z8|�|�dkrDWqtj�|j|�}t�|�|d7}Wqt	yzYq0q|S)z!Remove all messages from `queue`.rrrXr)
rRrZr[r]r^r_rSrTrfr6�r7r'�countrgrhrVrrr�_purge	s
zChannel._purgecCsNd}d|�d�}t�|j�}t|�dkrJ|��}|�|�dkr@q|d7}q|S)z<Return the number of messages in `queue` as an :class:`int`.rrrXr)rRrZr[r]r^r_rjrrr�_size"s
z
Channel._sizecCs
|jjjSrH)�
connection�client�transport_options�r7rrrrp3szChannel.transport_optionscCs|j�dd�S)Nr[Zdata_in�rp�getrqrrrr[7szChannel.data_folder_incCs|j�dd�S)NrUZdata_outrrrqrrrrU;szChannel.data_folder_outcCs|j�dd�S)Nr`Frrrqrrrr`?szChannel.store_processedcCs|j�dd�S)Nra�	processedrrrqrrrraCszChannel.processed_foldercCst|j�dd��S)Nr1�control)rrprsrqrrrr1GszChannel.control_folderN)�__name__�
__module__�__qualname__�__doc__Zsupports_fanoutr;rGrLrIrirlrm�propertyrpr
r[rUr`rar1rrrrr(�s*(




r(csZeZdZdZejjjdegd��d�Ze	Z	e�
�ZdZdZ
dZ�fdd�Zd	d
�Z�ZS)�	TransportzFilesystem Transport.F)�directZtopicZfanout)ZasynchronousZ
exchange_typer�
filesystemcs t�j|fi|��|j|_dSrH)�super�__init__�global_state�state)r7rorK��	__class__rrr[szTransport.__init__cCsdS)NzN/Arrqrrr�driver_version_szTransport.driver_version)rvrwrxryrr{Z
implements�extend�	frozensetr(ZBrokerStater��default_portZdriver_typeZdriver_namerr��
__classcell__rrr�rr{Ls
�r{)2ry�
__future__rrRrdrbrP�collectionsr�pathlibrr'r�timerZkombu.exceptionsrZkombu.transportrZkombu.utils.encodingr	r
Zkombu.utils.jsonrrZkombu.utils.objectsr
�VERSIONrT�map�str�__version__�nameZ
pywintypesZwin32conrZLOCKFILE_EXCLUSIVE_LOCKr r!ZLOCKFILE_FAIL_IMMEDIATELYZLOCK_NBZ
OVERLAPPEDrrrr"�RuntimeErrorr$r(r{rrrr�<module>sP[

��.