File: //lib/python3.9/site-packages/ansible_collections/cisco/nxos/plugins/httpapi/nxos.py
# (c) 2018 Red Hat Inc.
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = """
author: Ansible Networking Team (@ansible-network)
name: nxos
short_description: Use NX-API to run commands on Cisco NX-OS platform
description:
- This plugin provides low level abstraction APIs for sending and receiving
commands using NX-API with devices running Cisco NX-OS.
version_added: 1.0.0
"""
import collections
import json
import re
from ansible.module_utils._text import to_text
from ansible.module_utils.connection import ConnectionError
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.utils import to_list
from ansible_collections.ansible.netcommon.plugins.plugin_utils.httpapi_base import HttpApiBase
OPTIONS = {
"format": ["text", "json"],
"diff_match": ["line", "strict", "exact", "none"],
"diff_replace": ["line", "block", "config"],
"output": ["text", "json"],
}
class HttpApi(HttpApiBase):
def __init__(self, *args, **kwargs):
super(HttpApi, self).__init__(*args, **kwargs)
self._device_info = None
self._module_context = {}
def read_module_context(self, module_key):
if self._module_context.get(module_key):
return self._module_context[module_key]
return None
def save_module_context(self, module_key, module_context):
self._module_context[module_key] = module_context
return None
def send_request(self, data, **message_kwargs):
output = None
queue = list()
responses = list()
for item in to_list(data):
cmd_output = message_kwargs.get("output") or "text"
if isinstance(item, dict):
command = item["command"]
if "output" in item:
cmd_output = item["output"]
else:
command = item
# Emulate '| json' from CLI
if command.endswith("| json"):
command = command.rsplit("|", 1)[0]
cmd_output = "json"
if output and output != cmd_output:
responses.extend(self._run_queue(queue, output))
queue = list()
output = cmd_output
queue.append(command)
if queue:
responses.extend(self._run_queue(queue, output))
if len(responses) == 1:
return responses[0]
return responses
def _run_queue(self, queue, output):
if self._become:
self.connection.queue_message(
"warning",
"become has no effect over httpapi. Use network_cli if privilege escalation is required",
)
request = request_builder(queue, output)
headers = {"Content-Type": "application/json"}
response, response_data = self.connection.send(
"/ins",
request,
headers=headers,
method="POST",
)
try:
response_data = json.loads(to_text(response_data.getvalue()))
except ValueError:
raise ConnectionError(
"Response was not valid JSON, got {0}".format(to_text(response_data.getvalue())),
)
results = handle_response(response_data)
return results
def get_device_info(self):
if self._device_info:
return self._device_info
device_info = {}
device_info["network_os"] = "nxos"
reply, platform_reply = self.send_request(["show version", "show inventory"])
find_os_version = [
r"\s+system:\s+version\s*(\S+)",
r"\s+kickstart:\s+version\s*(\S+)",
r"\s+NXOS:\s+version\s*(\S+)",
]
for regex in find_os_version:
match_ver = re.search(regex, reply, re.M)
if match_ver:
device_info["network_os_version"] = match_ver.group(1)
break
match_chassis_id = re.search(r"Hardware\n\s+cisco\s*(\S+\s+\S+)", reply, re.M)
if match_chassis_id:
device_info["network_os_model"] = match_chassis_id.group(1)
match_host_name = re.search(r"\s+Device name:\s*(\S+)", reply, re.M)
if match_host_name:
device_info["network_os_hostname"] = match_host_name.group(1)
find_os_image = [
r"\s+system image file is:\s*(\S+)",
r"\s+kickstart image file is:\s*(\S+)",
r"\s+NXOS image file is:\s*(\S+)",
]
for regex in find_os_image:
match_file_name = re.search(regex, reply, re.M)
if match_file_name:
device_info["network_os_image"] = match_file_name.group(1)
break
match_os_platform = re.search(
r'NAME: (?:"Chassis"| Chassis ),\s*DESCR:.*\nPID:\s*(\S+)',
platform_reply,
re.M,
)
if match_os_platform:
device_info["network_os_platform"] = match_os_platform.group(1)
self._device_info = device_info
return self._device_info
def get_device_operations(self):
platform = self.get_device_info().get("network_os_platform", "")
return {
"supports_diff_replace": True,
"supports_commit": False,
"supports_rollback": False,
"supports_defaults": True,
"supports_onbox_diff": False,
"supports_commit_comment": False,
"supports_multiline_delimiter": False,
"supports_diff_match": True,
"supports_diff_ignore_lines": True,
"supports_generate_diff": True,
"supports_replace": True if "9K" in platform else False,
}
def get_capabilities(self):
result = {}
result["rpc"] = []
result["device_info"] = self.get_device_info()
result["device_operations"] = self.get_device_operations()
result.update(OPTIONS)
result["network_api"] = "nxapi"
return json.dumps(result)
# Shims for resource module support
def get(self, command, output=None):
# This method is ONLY here to support resource modules. Therefore most
# arguments are unsupported and not present.
return self.send_request(data=command, output=output)
def edit_config(self, candidate):
# This method is ONLY here to support resource modules. Therefore most
# arguments are unsupported and not present.
responses = self.send_request(candidate, output="config")
return [resp for resp in to_list(responses) if resp != "{}"]
def handle_response(response):
results = []
if response["ins_api"].get("outputs"):
for output in to_list(response["ins_api"]["outputs"]["output"]):
if output["code"] != "200":
# Best effort messages: some API output keys may not exist on some platforms
input_data = output.get("input", "")
msg = output.get("msg", "")
clierror = output.get("clierror", "")
raise ConnectionError(
"%s: %s: %s" % (input_data, msg, clierror),
code=output["code"],
)
elif "body" in output:
result = output["body"]
if isinstance(result, dict):
result = json.dumps(result)
results.append(result.strip())
return results
def request_builder(commands, output, version="1.0", chunk="0", sid=None):
"""Encodes a NXAPI JSON request message"""
output_to_command_type = {
"text": "cli_show_ascii",
"json": "cli_show",
"bash": "bash",
"config": "cli_conf",
}
maybe_output = commands[0].split("|")[-1].strip()
if maybe_output in output_to_command_type:
command_type = output_to_command_type[maybe_output]
commands = [command.split("|")[0].strip() for command in commands]
else:
try:
command_type = output_to_command_type[output]
except KeyError:
msg = "invalid format, received %s, expected one of %s" % (
output,
",".join(output_to_command_type.keys()),
)
raise ConnectionError(msg)
if isinstance(commands, (list, set, tuple)):
commands = " ;".join(commands)
# Order should not matter but some versions of NX-OS software fail
# to process the payload properly if 'input' gets serialized before
# 'type' and the payload of 'input' contains the word 'type'.
msg = collections.OrderedDict()
msg["version"] = version
msg["type"] = command_type
msg["chunk"] = chunk
msg["sid"] = sid
msg["input"] = commands
msg["output_format"] = "json"
return json.dumps(dict(ins_api=msg))