HEX
Server: LiteSpeed
System: Linux kapuas.iixcp.rumahweb.net 5.14.0-427.42.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 1 14:58:02 EDT 2024 x86_64
User: mirz4654 (1666)
PHP: 8.1.33
Disabled: system,exec,escapeshellarg,escapeshellcmd,passthru,proc_close,proc_get_status,proc_nice,proc_open,proc_terminate,shell_exec,popen,pclose,dl,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setsid,posix_setuid,posix_setpgid,ini_alter,show_source,define_syslog_variables,symlink,syslog,openlog,openlog,closelog,ocinumcols,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dll,ftp,myshellexec,socket_bind,mail,posix_getwpuid
Upload Files
File: //usr/local/lib/python3.9/site-packages/celery/security/serialization.py
"""Secure serializer."""
from kombu.serialization import dumps, loads, registry
from kombu.utils.encoding import bytes_to_str, ensure_bytes, str_to_bytes

from celery.app.defaults import DEFAULT_SECURITY_DIGEST
from celery.utils.serialization import b64decode, b64encode

from .certificate import Certificate, FSCertStore
from .key import PrivateKey
from .utils import get_digest_algorithm, reraise_errors

__all__ = ('SecureSerializer', 'register_auth')

# Note: we guarantee that this value won't appear in the serialized data,
# so we can use it as a separator.
# If you change this value, make sure it's not present in the serialized data.
DEFAULT_SEPARATOR = str_to_bytes("\x00\x01")


class SecureSerializer:
    """Signed serializer."""

    def __init__(self, key=None, cert=None, cert_store=None,
                 digest=DEFAULT_SECURITY_DIGEST, serializer='json'):
        self._key = key
        self._cert = cert
        self._cert_store = cert_store
        self._digest = get_digest_algorithm(digest)
        self._serializer = serializer

    def serialize(self, data):
        """Serialize data structure into string."""
        assert self._key is not None
        assert self._cert is not None
        with reraise_errors('Unable to serialize: {0!r}', (Exception,)):
            content_type, content_encoding, body = dumps(
                data, serializer=self._serializer)

            # What we sign is the serialized body, not the body itself.
            # this way the receiver doesn't have to decode the contents
            # to verify the signature (and thus avoiding potential flaws
            # in the decoding step).
            body = ensure_bytes(body)
            return self._pack(body, content_type, content_encoding,
                              signature=self._key.sign(body, self._digest),
                              signer=self._cert.get_id())

    def deserialize(self, data):
        """Deserialize data structure from string."""
        assert self._cert_store is not None
        with reraise_errors('Unable to deserialize: {0!r}', (Exception,)):
            payload = self._unpack(data)
            signature, signer, body = (payload['signature'],
                                       payload['signer'],
                                       payload['body'])
            self._cert_store[signer].verify(body, signature, self._digest)
        return loads(body, payload['content_type'],
                     payload['content_encoding'], force=True)

    def _pack(self, body, content_type, content_encoding, signer, signature,
              sep=DEFAULT_SEPARATOR):
        fields = sep.join(
            ensure_bytes(s) for s in [b64encode(signer), b64encode(signature),
                                      content_type, content_encoding, body]
        )
        return b64encode(fields)

    def _unpack(self, payload, sep=DEFAULT_SEPARATOR):
        raw_payload = b64decode(ensure_bytes(payload))
        v = raw_payload.split(sep, maxsplit=4)
        return {
            'signer': b64decode(v[0]),
            'signature': b64decode(v[1]),
            'content_type': bytes_to_str(v[2]),
            'content_encoding': bytes_to_str(v[3]),
            'body': v[4],
        }


def register_auth(key=None, key_password=None, cert=None, store=None,
                  digest=DEFAULT_SECURITY_DIGEST,
                  serializer='json'):
    """Register security serializer."""
    s = SecureSerializer(key and PrivateKey(key, password=key_password),
                         cert and Certificate(cert),
                         store and FSCertStore(store),
                         digest, serializer=serializer)
    registry.register('auth', s.serialize, s.deserialize,
                      content_type='application/data',
                      content_encoding='utf-8')