HEX
Server: LiteSpeed
System: Linux kapuas.iixcp.rumahweb.net 5.14.0-427.42.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 1 14:58:02 EDT 2024 x86_64
User: mirz4654 (1666)
PHP: 8.1.33
Disabled: system,exec,escapeshellarg,escapeshellcmd,passthru,proc_close,proc_get_status,proc_nice,proc_open,proc_terminate,shell_exec,popen,pclose,dl,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setsid,posix_setuid,posix_setpgid,ini_alter,show_source,define_syslog_variables,symlink,syslog,openlog,openlog,closelog,ocinumcols,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dll,ftp,myshellexec,socket_bind,mail,posix_getwpuid
Upload Files
File: //lib/python3.9/site-packages/ansible_collections/awx/awx/test/awx/test_workflow_job_template.py
from __future__ import absolute_import, division, print_function

__metaclass__ = type

import pytest

from awx.main.models import WorkflowJobTemplate, NotificationTemplate


@pytest.mark.django_db
def test_create_workflow_job_template(run_module, admin_user, organization, survey_spec):
    result = run_module(
        'workflow_job_template',
        {
            'name': 'foo-workflow',
            'organization': organization.name,
            'extra_vars': {'foo': 'bar', 'another-foo': {'barz': 'bar2'}},
            'survey_spec': survey_spec,
            'survey_enabled': True,
            'state': 'present',
            'job_tags': '',
            'skip_tags': '',
        },
        admin_user,
    )
    assert not result.get('failed', False), result.get('msg', result)

    wfjt = WorkflowJobTemplate.objects.get(name='foo-workflow')
    assert wfjt.extra_vars == '{"foo": "bar", "another-foo": {"barz": "bar2"}}'

    result.pop('invocation', None)
    assert result == {"name": "foo-workflow", "id": wfjt.id, "changed": True}

    assert wfjt.organization_id == organization.id
    assert wfjt.survey_spec == survey_spec


@pytest.mark.django_db
def test_create_modify_no_survey(run_module, admin_user, organization, survey_spec):
    result = run_module(
        'workflow_job_template',
        {
            'name': 'foo-workflow',
            'organization': organization.name,
            'job_tags': '',
            'skip_tags': '',
        },
        admin_user,
    )
    assert not result.get('failed', False), result.get('msg', result)
    assert result.get('changed', False), result

    wfjt = WorkflowJobTemplate.objects.get(name='foo-workflow')
    assert wfjt.organization_id == organization.id
    assert wfjt.survey_spec == {}
    result.pop('invocation', None)
    assert result == {"name": "foo-workflow", "id": wfjt.id, "changed": True}

    result = run_module('workflow_job_template', {'name': 'foo-workflow', 'organization': organization.name}, admin_user)
    assert not result.get('failed', False), result.get('msg', result)
    assert not result.get('changed', True), result


@pytest.mark.django_db
def test_survey_spec_only_changed(run_module, admin_user, organization, survey_spec):
    wfjt = WorkflowJobTemplate.objects.create(organization=organization, name='foo-workflow', survey_enabled=True, survey_spec=survey_spec)
    result = run_module('workflow_job_template', {'name': 'foo-workflow', 'organization': organization.name, 'state': 'present'}, admin_user)
    assert not result.get('failed', False), result.get('msg', result)
    assert not result.get('changed', True), result
    wfjt.refresh_from_db()
    assert wfjt.survey_spec == survey_spec

    survey_spec['description'] = 'changed description'

    result = run_module(
        'workflow_job_template', {'name': 'foo-workflow', 'organization': organization.name, 'survey_spec': survey_spec, 'state': 'present'}, admin_user
    )
    assert not result.get('failed', False), result.get('msg', result)
    assert result.get('changed', True), result
    wfjt.refresh_from_db()
    assert wfjt.survey_spec == survey_spec


@pytest.mark.django_db
def test_survey_spec_missing_field(run_module, admin_user, organization, survey_spec):
    wfjt = WorkflowJobTemplate.objects.create(organization=organization, name='foo-workflow', survey_enabled=True, survey_spec=survey_spec)
    result = run_module('workflow_job_template', {'name': 'foo-workflow', 'organization': organization.name, 'state': 'present'}, admin_user)
    assert not result.get('failed', False), result.get('msg', result)
    assert not result.get('changed', True), result
    wfjt.refresh_from_db()
    assert wfjt.survey_spec == survey_spec

    del survey_spec['description']

    result = run_module(
        'workflow_job_template', {'name': 'foo-workflow', 'organization': organization.name, 'survey_spec': survey_spec, 'state': 'present'}, admin_user
    )
    assert result.get('failed', True)
    assert result.get('msg') == "Failed to update survey: Field 'description' is missing from survey spec."


@pytest.mark.django_db
def test_associate_only_on_success(run_module, admin_user, organization, project):
    wfjt = WorkflowJobTemplate.objects.create(
        organization=organization,
        name='foo-workflow',
        # survey_enabled=True, survey_spec=survey_spec
    )
    create_kwargs = dict(
        notification_configuration={'url': 'http://www.example.com/hook', 'headers': {'X-Custom-Header': 'value123'}, 'password': 'bar'},
        notification_type='webhook',
        organization=organization,
    )
    nt1 = NotificationTemplate.objects.create(name='nt1', **create_kwargs)
    nt2 = NotificationTemplate.objects.create(name='nt2', **create_kwargs)

    wfjt.notification_templates_error.add(nt1)

    # test preservation of error NTs when success NTs are added
    result = run_module(
        'workflow_job_template', {'name': 'foo-workflow', 'organization': organization.name, 'notification_templates_success': ['nt2']}, admin_user
    )
    assert not result.get('failed', False), result.get('msg', result)
    assert result.get('changed', True), result

    assert list(wfjt.notification_templates_success.values_list('id', flat=True)) == [nt2.id]
    assert list(wfjt.notification_templates_error.values_list('id', flat=True)) == [nt1.id]

    # test removal to empty list
    result = run_module('workflow_job_template', {'name': 'foo-workflow', 'organization': organization.name, 'notification_templates_success': []}, admin_user)
    assert not result.get('failed', False), result.get('msg', result)
    assert result.get('changed', True), result

    assert list(wfjt.notification_templates_success.values_list('id', flat=True)) == []
    assert list(wfjt.notification_templates_error.values_list('id', flat=True)) == [nt1.id]


@pytest.mark.django_db
def test_delete_with_spec(run_module, admin_user, organization, survey_spec):
    WorkflowJobTemplate.objects.create(organization=organization, name='foo-workflow', survey_enabled=True, survey_spec=survey_spec)
    result = run_module('workflow_job_template', {'name': 'foo-workflow', 'organization': organization.name, 'state': 'absent'}, admin_user)
    assert not result.get('failed', False), result.get('msg', result)
    assert result.get('changed', True), result

    assert WorkflowJobTemplate.objects.filter(name='foo-workflow', organization=organization).count() == 0