File: //lib/python3.9/site-packages/ansible_collections/wti/remote/plugins/lookup/cpm_iptables_config.py
# -*- coding: utf-8 -*-
#
# (C) 2019 Red Hat Inc.
# Copyright (C) 2019 Western Telematic Inc.
#
# GNU General Public License v3.0+
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
#
# Module to configure WTI network IPTables Parameters on WTI OOB and PDU devices.
# CPM remote_management
#
from __future__ import absolute_import, division, print_function
__metaclass__ = type
ANSIBLE_METADATA = {
'metadata_version': '1.1',
'status': ['preview'],
'supported_by': 'community'
}
DOCUMENTATION = """
---
module: cpm_iptables_config
version_added: "2.10.0"
author:
- "Western Telematic Inc. (@wtinetworkgear)"
short_description: Set network IPTables parameters in WTI OOB and PDU devices
description:
- "Set network IPTables parameters in WTI OOB and PDU devices"
options:
cpm_url:
description:
- This is the URL of the WTI device to send the module.
type: str
required: true
cpm_username:
description:
- This is the Username of the WTI device to send the module.
type: str
required: true
cpm_password:
description:
- This is the Password of the WTI device to send the module.
type: str
required: true
use_https:
description:
- Designates to use an https connection or http connection.
type: bool
required: false
default: true
validate_certs:
description:
- If false, SSL certificates will not be validated. This should only be used
- on personally controlled sites using self-signed certificates.
type: bool
required: false
default: true
use_proxy:
description:
- Flag to control if the lookup will observe HTTP proxy environment variables when present.
type: bool
required: false
default: false
protocol:
description:
- The protocol that the iptables entry should be applied. 0 = ipv4, 1 = ipv6.
type: int
required: false
choices: [ 0, 1 ]
index:
description:
- Index in which command should be inserted. If not defined entry will start at position one.
type: list
required: false
elements: raw
command:
description:
- Actual iptables command to send to the WTI device.
type: list
required: true
elements: raw
clear:
description:
- Removes all the iptables for the protocol being defined before setting the newly defined entry.
type: int
required: false
choices: [ 0, 1 ]
notes:
- Use C(groups/cpm) in C(module_defaults) to set common options used between CPM modules.
"""
EXAMPLES = """
# Set Network IPTables Parameters
- name: Set the an IPTables Parameter for a WTI device
cpm_iptables_config:
cpm_url: "nonexist.wti.com"
cpm_username: "super"
cpm_password: "super"
use_https: true
validate_certs: false
command: "iptables -A INPUT -p tcp -m state --state NEW -m tcp --dport 443 -j ACCEPT"
# Sets multiple Network IPTables Parameters
- name: Set the IPTables Parameters a WTI device
cpm_iptables_config:
cpm_url: "nonexist.wti.com"
cpm_username: "super"
cpm_password: "super"
use_https: true
validate_certs: false
index:
- 1
- 2
command:
- "iptables -A INPUT -p tcp -m state --state NEW -m tcp --dport 443 -j ACCEPT"
- "iptables -A INPUT -p tcp -m state --state NEW -m tcp --dport 22 -j ACCEPT"
"""
RETURN = """
data:
description: The output JSON returned from the commands sent
returned: always
type: complex
contains:
iptables:
description: Current k/v pairs of interface info for the WTI device after module execution.
returned: always
type: dict
sample: [{ "eth0": { "ietf-ipv4": { "clear": 1, "entries": [
{ "entry": "iptables -A INPUT -p tcp -m state --state NEW -m tcp --dport 443 -j ACCEPT", "index": "1" },
{"entry": "iptables -A INPUT -p tcp -m state --state NEW -m tcp --dport 22 -j ACCEPT", "index": "2" }]}}}]
"""
import base64
import json
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils._text import to_text, to_bytes, to_native
from ansible.module_utils.six.moves.urllib.error import HTTPError, URLError
from ansible.module_utils.urls import open_url, ConnectionError, SSLValidationError
def assemble_json(cpmmodule, existing_interface):
total_change = total_commands = total_indices = 0
is_clear = is_changed = protocol = 0
negotiation = None
json_load = ""
ietfstring = "ietf-ipv4"
indices = []
commands = []
for x in range(0, 48):
indices.insert(x, None)
commands.insert(x, None)
if (cpmmodule.params['clear'] is not None):
is_clear = int(cpmmodule.params['clear'])
if (cpmmodule.params['protocol'] is not None):
protocol = int(cpmmodule.params['protocol'])
if (protocol == 1):
ietfstring = "ietf-ipv6"
index = cpmmodule.params['index']
if (index is not None):
if isinstance(index, list):
for x in index:
indices.insert(total_indices, (int(to_native(x))) - 1)
total_indices += 1
command = cpmmodule.params['command']
if (command is not None):
if isinstance(command, list):
for x in command:
if (total_indices == 0):
commands.insert(total_commands, to_native(x))
else:
commands.insert(indices[total_commands], to_native(x))
total_commands += 1
if (total_indices > 0):
if (total_commands != total_indices):
return None
for x in range(0, 48):
if (commands[x] is not None):
if ((existing_interface["iptables"][0]["eth0"][ietfstring]["entries"][x]["entry"] != commands[x]) or (is_clear == 1)):
if (is_changed > 0):
json_load = '%s,' % (json_load)
json_load = '%s{"entry": "%s","index": "%d"}' % (json_load, commands[x], (x + 1))
is_changed += 1
if (is_changed > 0) or (is_clear > 0):
json_load = '{"iptables": [{"eth0": { "%s": { "clear": %d, "entries": [ %s ]}}}]}' % (ietfstring, is_clear, json_load)
return json_load
def run_module():
# define the available arguments/parameters that a user can pass to
# the module
module_args = dict(
cpm_url=dict(type='str', required=True),
cpm_username=dict(type='str', required=True),
cpm_password=dict(type='str', required=True, no_log=True),
protocol=dict(type='int', required=False, default=0, choices=[0, 1]),
index=dict(type='list', elements='raw', required=False, default=None),
command=dict(type='list', elements='raw', required=True),
clear=dict(type='int', required=False, default=None, choices=[0, 1]),
use_https=dict(type='bool', default=True),
validate_certs=dict(type='bool', default=True),
use_proxy=dict(type='bool', default=False)
)
result = dict(
changed=False,
data=''
)
module = AnsibleModule(argument_spec=module_args, supports_check_mode=True)
auth = to_text(base64.b64encode(to_bytes('{0}:{1}'.format(to_native(module.params['cpm_username']), to_native(module.params['cpm_password'])),
errors='surrogate_or_strict')))
if module.params['use_https'] is True:
transport = "https://"
else:
transport = "http://"
fullurl = ("%s%s/api/v2/config/iptables" % (transport, to_native(module.params['cpm_url'])))
method = 'GET'
try:
response = open_url(fullurl, data=None, method=method, validate_certs=module.params['validate_certs'], use_proxy=module.params['use_proxy'],
headers={'Content-Type': 'application/json', 'Authorization': "Basic %s" % auth})
except HTTPError as e:
fail_json = dict(msg='GET: Received HTTP error for {0} : {1}'.format(fullurl, to_native(e)), changed=False)
module.fail_json(**fail_json)
except URLError as e:
fail_json = dict(msg='GET: Failed lookup url for {0} : {1}'.format(fullurl, to_native(e)), changed=False)
module.fail_json(**fail_json)
except SSLValidationError as e:
fail_json = dict(msg='GET: Error validating the server''s certificate for {0} : {1}'.format(fullurl, to_native(e)), changed=False)
module.fail_json(**fail_json)
except ConnectionError as e:
fail_json = dict(msg='GET: Error connecting to {0} : {1}'.format(fullurl, to_native(e)), changed=False)
module.fail_json(**fail_json)
result['data'] = json.loads(response.read())
payload = assemble_json(module, result['data'])
if module.check_mode:
if (payload is not None) and (len(payload) > 0):
result['changed'] = True
else:
if (payload is not None) and (len(payload) > 0):
fullurl = ("%s%s/api/v2/config/iptables" % (transport, to_native(module.params['cpm_url'])))
method = 'POST'
try:
response = open_url(fullurl, data=payload, method=method, validate_certs=module.params['validate_certs'], use_proxy=module.params['use_proxy'],
headers={'Content-Type': 'application/json', 'Authorization': "Basic %s" % auth})
except HTTPError as e:
fail_json = dict(msg='POST: Received HTTP error for {0} : {1}'.format(fullurl, to_native(e)), changed=False)
module.fail_json(**fail_json)
except URLError as e:
fail_json = dict(msg='POST: Failed lookup url for {0} : {1}'.format(fullurl, to_native(e)), changed=False)
module.fail_json(**fail_json)
except SSLValidationError as e:
fail_json = dict(msg='POST: Error validating the server''s certificate for {0} : {1}'.format(fullurl, to_native(e)), changed=False)
module.fail_json(**fail_json)
except ConnectionError as e:
fail_json = dict(msg='POST: Error connecting to {0} : {1}'.format(fullurl, to_native(e)), changed=False)
module.fail_json(**fail_json)
result['changed'] = True
result['data'] = json.loads(response.read())
module.exit_json(**result)
def main():
run_module()
if __name__ == '__main__':
main()