HEX
Server: LiteSpeed
System: Linux kapuas.iixcp.rumahweb.net 5.14.0-427.42.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 1 14:58:02 EDT 2024 x86_64
User: mirz4654 (1666)
PHP: 8.1.33
Disabled: system,exec,escapeshellarg,escapeshellcmd,passthru,proc_close,proc_get_status,proc_nice,proc_open,proc_terminate,shell_exec,popen,pclose,dl,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setsid,posix_setuid,posix_setpgid,ini_alter,show_source,define_syslog_variables,symlink,syslog,openlog,openlog,closelog,ocinumcols,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dll,ftp,myshellexec,socket_bind,mail,posix_getwpuid
Upload Files
File: //usr/local/lib/python3.9/site-packages/celery/contrib/testing/__pycache__/mocks.cpython-39.pyc
a

X>hV�@s�dZddlZddlmZmZddlmZmZmZddlm	Z	ddl
mZddlm
Z
dd	d
�Zddd�Zd
efdd�ZGdd�de	�Zdd�ZdS)zUseful mocks for unit testing.�N)�datetime�	timedelta)�Any�Mapping�Sequence)�Mock)�Celery)�	Signature�c	Ks�|sin|}ddlm}
ddlm}|p,|�}td|��d�}|||d�|_|||d�}
|j�|	�|
|||
fdd	�\|_|_|_	|||
f|_
|S)
z)Create task message in protocol 2 format.r��dumps��uuid�TaskMessage-��name)�id�task�shadow)�	callbacks�errbacks�chain�json)�
serializer)�kombu.serializationr�celeryrr�headers�update�content_type�content_encoding�body�payload)rr�args�kwargsrrrr�utc�optionsrr�message�embedr
r
�F/usr/local/lib/python3.9/site-packages/celery/contrib/testing/mocks.py�TaskMessages 
�
�r)cKs~|sin|}ddlm}ddlm}	|p,|	�}td|��d�}
i|
_||||||d�|
_|
j�|�||
j�\|
_|
_	|
_
|
S)z)Create task message in protocol 1 format.rrr
rr)rrr"r#rr)rrrrrrr!rrrr )rrr"r#rrrr%rrr&r
r
r(�TaskMessage1-s$
��r*Tc	Cs|��|j�dd�}|j�dd�}|j�dd�}|rJ|��t|d�}n|j�dd�}|rnt|t�rn|��}|j�dd�}|r�t|tj	�r�|��t|d�}|r�t|t�r�|��}||j
f|j|j|j
|r�dd	�|D�nd|r�d
d	�|D�nd|||d�|j��S)z�Create task message from :class:`celery.Signature`.

    Example:
        >>> m = task_message_from_sig(app, add.s(2, 2))
        >>> amqp_client.basic_publish(m, exchange='ex', routing_key='rkey')
    �linkNZ
link_error�	countdown)�seconds�eta�expirescSsg|]}t|��qSr
��dict��.0�sr
r
r(�
<listcomp>i�z)task_message_from_sig.<locals>.<listcomp>cSsg|]}t|��qSr
r0r2r
r
r(r5jr6)rr"r#rrr.r/r$)�freezer%�pop�nowr�
isinstancer�	isoformat�numbers�Realrrr"r#)	Zapp�sigr$r)rrr,r.r/r
r
r(�task_message_from_sigOs6���r?c@s eZdZdZdd�Zdd�ZdS)�_ContextMockz�Dummy class implementing __enter__ and __exit__.

    The :keyword:`with` statement requires these to be implemented
    in the class, not just the instance.
    cCs|S�Nr
)�selfr
r
r(�	__enter__ysz_ContextMock.__enter__cGsdSrAr
)rB�exc_infor
r
r(�__exit__|sz_ContextMock.__exit__N)�__name__�
__module__�__qualname__�__doc__rCrEr
r
r
r(r@rsr@cOs>t|i|��}|�t�d�|�t�d�||j_d|j_|S)z3Mock that mocks :keyword:`with` statement contexts.rCrEN)r@Zattach_mockrCZreturn_valuerE)r"r#�objr
r
r(�ContextMock�srK)Nr
NNNNNN)Nr
NNNN)rIr<rr�typingrrrZ
unittest.mockrrrZ
celery.canvasr	r)r*r?r@rKr
r
r
r(�<module>s2�
$�
"#