HEX
Server: LiteSpeed
System: Linux kapuas.iixcp.rumahweb.net 5.14.0-427.42.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 1 14:58:02 EDT 2024 x86_64
User: mirz4654 (1666)
PHP: 8.1.33
Disabled: system,exec,escapeshellarg,escapeshellcmd,passthru,proc_close,proc_get_status,proc_nice,proc_open,proc_terminate,shell_exec,popen,pclose,dl,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setsid,posix_setuid,posix_setpgid,ini_alter,show_source,define_syslog_variables,symlink,syslog,openlog,openlog,closelog,ocinumcols,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dll,ftp,myshellexec,socket_bind,mail,posix_getwpuid
Upload Files
File: //usr/local/lib/python3.9/site-packages/celery/worker/consumer/__pycache__/gossip.cpython-39.pyc
a

X>h��@s�dZddlmZddlmZddlmZddlmZddl	m
Z
ddlmZddl
mZmZdd	lmZdd
lmZddlmZdd
lmZdZee�ZejejZZGdd�dej�ZdS)z)Worker <-> Worker communication Bootstep.�)�defaultdict)�partial)�heappush)�
itemgetter)�Consumer)�	DummyLock)�ContentDisallowed�DecodeError)�	bootsteps)�
get_logger)�Bunch�)�Mingle)�Gossipcs�eZdZdZdZefZeddddddd�Zd	d
hZ	d-�fdd�	Z
dd�Zd.dd�Zdd�Z
dd�Z�fdd�Zdd�Zdd�Zdd �Zd!d"�Zd#d$�Zd%d&�Zd'd(�Zd)d*�Zd+d,�Z�ZS)/rzfBootstep consuming events from other workers.

    This keeps the logical clock value up to date.
    �id�clock�hostname�pid�topic�action�cverZamqpZredisF�@�@cs|o|�|j�|_|j|_||_|jjj|_|j|_d�|jt|j	�g�|_
tt�t�t�d�|_
|j|_|jr�|jjj|j|jdd�|_|jr�t�|_|jj|_||_||_d|_tt�|_i|_|j|j d�|_!|jj"|_"d|j#i|_$t%�j&|fi|��dS)N�.)�	node_join�
node_leave�	node_lostr
)�on_node_join�
on_node_leaveZmax_tasks_in_memory)zworker.electzworker.elect.ack�task)'�compatible_transport�app�enabledZgossip�events�Receiverr�join�strr�
full_hostnamer�set�on�timer�Staterr�stateZhubrZ_mutex�event�update_state�interval�heartbeat_interval�_trefr�list�consensus_requests�consensus_replies�on_elect�on_elect_ack�event_handlersr�	call_task�election_handlers�super�__init__)�self�cZwithout_gossipr/r0�kwargs��	__class__��G/usr/local/lib/python3.9/site-packages/celery/worker/consumer/gossip.pyr;$sB��

�
�zGossip.__init__cCs:|���}|jj|jvWd�S1s,0YdS�N)Zconnection_for_read�	transportZdriver_type�compatible_transports)r<r!�connrArArBr Ms
zGossip.compatible_transportNcCs$g|j|<|jjd|||dd�dS)Nzworker-electr
)rrrr)r4�
dispatcher�send)r<rrrrArArB�electionQs

�zGossip.electionc
CsJz|j�|���Wn0tyD}zt�d|�WYd}~n
d}~00dS)NzCould not call task: %r)r!�	signatureZapply_async�	Exception�logger�	exception)r<r�excrArArBr8XszGossip.call_taskc

Cs�z|�|�\}}}}}}}Wn0tyL}	zt�d|	�WYd}	~	Sd}	~	00t|j|||�d|��||f�|jjd|d�dS)Nz!election request missing field %srzworker-elect-ack)r)�_cons_stamp_fields�KeyErrorrLrMrr3rGrH)
r<r-Zid_rrrrr�_rNrArArBr5^s�
�"�zGossip.on_electcst��|�|j|_dSrC)r:�startZevent_dispatcherrG)r<r=r?rArBrRjszGossip.startc
Cs�|d}z|j|}Wnty*YdS0t|j���}|�|d�t|�t|�kr�|j�|j	|�\}}}}||j
kr�td|�z|j|}	Wnty�t
�d|�Yq�0|	|�ntd||�|j	�|d�|j�|d�dS)NrrzI won the election %rzUnknown election topic %rznode %s elected for %r)r4rPr(r,�
alive_workers�append�lenrZ	sort_heapr3r'�infor9rLrM�pop)
r<r-rZrepliesrSrQZleaderrr�handlerrArArBr6ns*�


zGossip.on_elect_ackcCs td|j�|�|jj|�dS)Nz%s joined the party)�debugr�_call_handlersr)r�r<�workerrArArBr�szGossip.on_node_joincCs td|j�|�|jj|�dS)Nz%s left)rYrrZr)rr[rArArBr�szGossip.on_node_leavecCs td|j�|�|jj|�dS)Nzmissed heartbeat from %s)rVrrZr)rr[rArArB�on_node_lost�szGossip.on_node_lostcOsT|D]J}z||i|��WqtyL}zt�d||�WYd}~qd}~00qdS)Nz!Ignored error from handler %r: %r)rKrLrM)r<�handlers�argsr>rXrNrArArBrZ�s�zGossip._call_handlerscCs,|jdur|j��|j�|j|j�|_dSrC)r1�cancelr*Zcall_repeatedlyr/�periodic)r<rArArB�register_timer�s

zGossip.register_timercCsR|jj}t�}|��D]}|js|�|�|�|�q|D]}|�|jd�q:dSrC)	r,�workersr(�values�alive�addr]rWr)r<rcZdirtyr\rArArBra�s
zGossip.periodiccCs>|��|j|d|jd�}t||jgt|j|j�|jdd�gS)Nzworker.#)�routing_keyZ	queue_ttlT)�queues�
on_message�acceptZno_ack)	rbr$r0r�queuerriZevent_from_messagerj)r<ZchannelZevrArArB�
get_consumers�s��zGossip.get_consumersc	
Cs�|jd}|�dd�ddkr"dSz|j|}WntyBYn0||j�S|j�d�pb|jd}||jkr�z||j�\}}|�|�Wq�t	t
tfy�}zt�
|�WYd}~q�d}~00n
|j��dS)Nrgrr
rrr)Z
delivery_info�splitr7rP�payload�headers�getrr.r	r�	TypeErrorrL�errorrZforward)	r<�prepare�message�_typerXrrQr-rNrArArBri�s$

�
"zGossip.on_message)Frr)N)�__name__�
__module__�__qualname__�__doc__�labelr�requiresrrOrEr;r rIr8r5rRr6rrr]rZrbrarlri�
__classcell__rArAr?rBrs0��)

rN)ry�collectionsr�	functoolsr�heapqr�operatorrZkomburZkombu.asynchronous.semaphorerZkombu.exceptionsrr	Zceleryr
Zcelery.utils.logrZcelery.utils.objectsrZmingler�__all__rvrLrYrVZConsumerSteprrArArArB�<module>s