File: //lib/python3.9/site-packages/ansible_collections/junipernetworks/junos/plugins/cliconf/junos.py
#
# (c) 2017 Red Hat Inc.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = """
author: Ansible Networking Team (@ansible-network)
name: junos
short_description: Use junos cliconf to run command on Juniper Junos OS platform
description:
- This junos plugin provides low level abstraction apis for sending and receiving
CLI commands from Juniper Junos OS network devices.
version_added: 1.0.0
options:
config_commands:
description:
- Specifies a list of commands that can make configuration changes
to the target device.
- When `ansible_network_single_user_mode` is enabled, if a command sent
to the device is present in this list, the existing cache is invalidated.
version_added: 2.0.0
type: list
elements: str
default: []
vars:
- name: ansible_junos_config_commands
"""
import json
import re
from functools import wraps
from itertools import chain
from ansible.errors import AnsibleConnectionFailure
from ansible.module_utils._text import to_text
from ansible.module_utils.common._collections_compat import Mapping
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.utils import to_list
from ansible_collections.ansible.netcommon.plugins.plugin_utils.cliconf_base import CliconfBase
def configure(func):
@wraps(func)
def wrapped(self, *args, **kwargs):
prompt = self._connection.get_prompt()
if not to_text(prompt, errors="surrogate_or_strict").strip().endswith("#"):
self.send_command("configure")
return func(self, *args, **kwargs)
return wrapped
class Cliconf(CliconfBase):
def __init__(self, *args, **kwargs):
self._device_info = {}
super(Cliconf, self).__init__(*args, **kwargs)
def get_text(self, ele, tag):
try:
return to_text(
ele.find(tag).text,
errors="surrogate_then_replace",
).strip()
except AttributeError:
pass
def get_device_info(self):
if not self._device_info:
device_info = {}
device_info["network_os"] = "junos"
reply = self.get(command="show version")
data = to_text(reply, errors="surrogate_or_strict").strip()
match = re.search(r"Junos: (\S+)", data)
if match:
device_info["network_os_version"] = match.group(1)
match = re.search(r"Model: (\S+)", data, re.M)
if match:
device_info["network_os_model"] = match.group(1)
match = re.search(r"Hostname: (\S+)", data, re.M)
if match:
device_info["network_os_hostname"] = match.group(1)
self._device_info = device_info
return self._device_info
def get_config(self, source="running", format="text", flags=None):
if source != "running":
raise ValueError(
"fetching configuration from %s is not supported" % source,
)
options_values = self.get_option_values()
if format not in options_values["format"]:
raise ValueError(
"'format' value %s is invalid. Valid values are %s"
% (format, ",".join(options_values["format"])),
)
if format == "text":
cmd = "show configuration"
else:
cmd = "show configuration | display %s" % format
cmd += " ".join(to_list(flags))
cmd = cmd.strip()
return self.send_command(cmd)
@configure
def edit_config(
self,
candidate=None,
commit=True,
replace=None,
comment=None,
):
operations = self.get_device_operations()
self.check_edit_config_capability(
operations,
candidate,
commit,
replace,
comment,
)
resp = {}
results = []
requests = []
if replace:
candidate = "load override {0}".format(replace)
for line in to_list(candidate):
if not isinstance(line, Mapping):
line = {"command": line}
cmd = line["command"]
try:
results.append(self.send_command(**line))
except AnsibleConnectionFailure as exc:
if "error: commit failed" in exc.message:
self.discard_changes()
raise
requests.append(cmd)
diff = self.compare_configuration()
if diff:
resp["diff"] = diff
if commit:
self.commit(comment=comment)
else:
self.discard_changes()
else:
self.send_command("top")
self.discard_changes()
resp["request"] = requests
resp["response"] = results
return resp
def get(
self,
command,
prompt=None,
answer=None,
sendonly=False,
output=None,
newline=True,
check_all=False,
):
if output:
command = self._get_command_with_output(command, output)
return self.send_command(
command=command,
prompt=prompt,
answer=answer,
sendonly=sendonly,
newline=newline,
check_all=check_all,
)
@configure
def commit(
self,
comment=None,
confirmed=False,
at_time=None,
synchronize=False,
):
"""
Execute commit command on remote device.
:param comment: Comment to be associated with commit
:param confirmed: Boolean flag to indicate if the previous commit should confirmed
:param at_time: Time at which to activate configuration changes
:param synchronize: Boolean flag to indicate if commit should synchronize on remote peers
:return: Command response received from device
"""
command = "commit"
if comment:
command += " comment {0}".format(comment)
if confirmed:
command += " confirmed"
if at_time:
command += " {0}".format(at_time)
if synchronize:
command += " peers-synchronize"
command += " and-quit"
try:
response = self.send_command(command)
except AnsibleConnectionFailure:
self.discard_changes()
raise
return response
@configure
def discard_changes(self):
command = "rollback 0"
for cmd in chain(to_list(command), ["exit"]):
self.send_command(cmd)
@configure
def validate(self):
return self.send_command("commit check")
@configure
def compare_configuration(self, rollback_id=None):
command = "show | compare"
if rollback_id is not None:
command += " rollback %s" % int(rollback_id)
resp = self.send_command(command)
r = resp.splitlines()
if len(r) == 1 and "[edit]" in r[0] or len(r) == 4 and r[1].startswith("- version"):
resp = ""
return resp
@configure
def rollback(self, rollback_id, commit=True):
resp = {}
self.send_command("rollback %s" % int(rollback_id))
resp["diff"] = self.compare_configuration()
if commit:
self.commit()
else:
self.discard_changes()
return resp
def get_diff(self, rollback_id=None):
diff = {"config_diff": None}
response = self.compare_configuration(rollback_id=rollback_id)
if response:
diff["config_diff"] = response
return diff
def get_device_operations(self):
return {
"supports_diff_replace": False,
"supports_commit": True,
"supports_rollback": True,
"supports_defaults": False,
"supports_onbox_diff": True,
"supports_commit_comment": True,
"supports_multiline_delimiter": False,
"supports_diff_match": False,
"supports_diff_ignore_lines": False,
"supports_generate_diff": False,
"supports_replace": True,
}
def get_option_values(self):
return {
"format": ["text", "set", "xml", "json"],
"diff_match": [],
"diff_replace": [],
"output": ["text", "set", "xml", "json"],
}
def get_capabilities(self):
result = super(Cliconf, self).get_capabilities()
result["rpc"] += [
"commit",
"discard_changes",
"run_commands",
"compare_configuration",
"validate",
"get_diff",
]
result["device_operations"] = self.get_device_operations()
result.update(self.get_option_values())
return json.dumps(result)
def set_cli_prompt_context(self):
"""
Make sure we are in the operational cli mode
:return: None
"""
if self._connection.connected:
self._update_cli_prompt_context(config_context="#")
def _get_command_with_output(self, command, output):
options_values = self.get_option_values()
if output not in options_values["output"]:
raise ValueError(
"'output' value %s is invalid. Valid values are %s"
% (output, ",".join(options_values["output"])),
)
if output == "json" and not command.endswith("| display json"):
cmd = "%s | display json" % command
elif output == "xml" and not command.endswith("| display xml"):
cmd = "%s | display xml" % command
elif output == "text" and (
command.endswith("| display json") or command.endswith("| display xml")
):
cmd = command.rsplit("|", 1)[0]
else:
cmd = command
return cmd