File: //lib/python3.9/site-packages/ansible_collections/check_point/mgmt/plugins/httpapi/checkpoint.py
# (c) 2018 Red Hat Inc.
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
DOCUMENTATION = """
---
author: Ansible Networking Team (@rcarrillocruz)
name: checkpoint
short_description: HttpApi Plugin for Checkpoint devices
description:
- This HttpApi plugin provides methods to connect to Checkpoint
devices over a HTTP(S)-based api.
version_added: "2.8.0"
options:
domain:
type: str
description:
- Specifies the domain of the Check Point device
vars:
- name: ansible_checkpoint_domain
api_key:
type: str
description:
- Login with api-key instead of user & password
vars:
- name: ansible_api_key
cloud_mgmt_id:
type: str
description:
- The Cloud Management ID
vars:
- name: ansible_cloud_mgmt_id
"""
import json
from ansible.module_utils.basic import to_text
from ansible.errors import AnsibleConnectionFailure
from ansible.module_utils.six.moves.urllib.error import HTTPError
from ansible.plugins.httpapi import HttpApiBase
from ansible.module_utils.connection import ConnectionError
BASE_HEADERS = {
'Content-Type': 'application/json',
'User-Agent': 'Ansible',
}
class HttpApi(HttpApiBase):
def login(self, username, password):
payload = {}
cp_domain = self.get_option('domain')
cp_api_key = self.get_option('api_key')
if cp_domain:
payload['domain'] = cp_domain
if username and password and not cp_api_key:
payload['user'] = username
payload['password'] = password
elif cp_api_key and not username and not password:
payload['api-key'] = cp_api_key
else:
raise AnsibleConnectionFailure('[Username and password] or api_key are required for login')
url = '/web_api/login'
response, response_data = self.send_request(url, payload)
try:
self.connection._auth = {'X-chkp-sid': response_data['sid']}
except KeyError:
raise ConnectionError(
'Server returned response without token info during connection authentication: %s' % response)
# Case of read-only
if 'uid' in response_data.keys():
self.connection._session_uid = response_data['uid']
def logout(self):
url = '/web_api/logout'
response, dummy = self.send_request(url, None)
def get_session_uid(self):
return self.connection._session_uid
def send_request(self, path, body_params):
data = json.dumps(body_params) if body_params else '{}'
cp_cloud_mgmt_id = self.get_option('cloud_mgmt_id')
if cp_cloud_mgmt_id:
path = '/' + cp_cloud_mgmt_id + path
try:
self._display_request()
response, response_data = self.connection.send(path, data, method='POST', headers=BASE_HEADERS)
value = self._get_response_value(response_data)
return response.getcode(), self._response_to_json(value)
except AnsibleConnectionFailure as e:
return 404, e.message
except HTTPError as e:
error = json.loads(e.read())
return e.code, error
def _display_request(self):
self.connection.queue_message('vvvv', 'Web Services: %s %s' % ('POST', self.connection._url))
def _get_response_value(self, response_data):
return to_text(response_data.getvalue())
def _response_to_json(self, response_text):
try:
return json.loads(response_text) if response_text else {}
# JSONDecodeError only available on Python 3.5+
except ValueError:
raise ConnectionError('Invalid JSON response: %s' % response_text)