From 96d84e67145a0dddec07ce6217fcbc13cbdb09bf Mon Sep 17 00:00:00 2001 From: Fede Barcelona Date: Tue, 28 Sep 2021 13:29:30 +0200 Subject: [PATCH] feat: Add ScanningAlertsClientV1 with support for Scanning Alerts (#200) * feat(scanning): Add support for runtime alert creation * feat(scanning): Add support for repository alert creation * ci(scanning): Add test that retrieves an alert * feat(scanning): Add support for runtime alert update * feat(scanning): Add support for repository alert update * refactor(scanning): Extract ScanningAlertsClientV1 client * docs: Add usage examples * docs: Add SdScanningClient documentation autogeneration * fix(lint): Expand imports * lint: Fix linting errors and add EditorConfig --- .editorconfig | 129 +++++++++ .flake8 | 2 +- docs/reference/secure.rst | 6 + sdcclient/__init__.py | 2 +- sdcclient/_scanning.py | 331 ++++++++++------------ sdcclient/secure/__init__.py | 7 +- sdcclient/secure/scanning/__init__.py | 3 + sdcclient/secure/scanning/_alerts.py | 381 ++++++++++++++++++++++++++ specs/secure/scanning/alerts_spec.py | 243 ++++++++++++++++ 9 files changed, 913 insertions(+), 191 deletions(-) create mode 100644 .editorconfig create mode 100644 sdcclient/secure/scanning/__init__.py create mode 100644 sdcclient/secure/scanning/_alerts.py create mode 100644 specs/secure/scanning/alerts_spec.py diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 00000000..125304ea --- /dev/null +++ b/.editorconfig @@ -0,0 +1,129 @@ +root = true + +[*] +charset = utf-8 +end_of_line = lf +indent_size = 4 +indent_style = space +insert_final_newline = true +max_line_length = 120 +tab_width = 4 +ij_continuation_indent_size = 8 +ij_formatter_off_tag = @formatter:off +ij_formatter_on_tag = @formatter:on +ij_formatter_tags_enabled = false +ij_smart_tabs = false +ij_visual_guides = none +ij_wrap_on_typing = false + +[{*.har,*.jsb2,*.jsb3,*.json,.babelrc,.eslintrc,.stylelintrc,bowerrc,jest.config}] +indent_size = 2 +ij_json_keep_blank_lines_in_code = 0 +ij_json_keep_indents_on_empty_lines = false +ij_json_keep_line_breaks = true +ij_json_space_after_colon = true +ij_json_space_after_comma = true +ij_json_space_before_colon = true +ij_json_space_before_comma = false +ij_json_spaces_within_braces = false +ij_json_spaces_within_brackets = false +ij_json_wrap_long_lines = false + +[{*.markdown,*.md}] +ij_markdown_force_one_space_after_blockquote_symbol = true +ij_markdown_force_one_space_after_header_symbol = true +ij_markdown_force_one_space_after_list_bullet = true +ij_markdown_force_one_space_between_words = true +ij_markdown_keep_indents_on_empty_lines = false +ij_markdown_max_lines_around_block_elements = 1 +ij_markdown_max_lines_around_header = 1 +ij_markdown_max_lines_between_paragraphs = 1 +ij_markdown_min_lines_around_block_elements = 1 +ij_markdown_min_lines_around_header = 1 +ij_markdown_min_lines_between_paragraphs = 1 + +[{*.py,*.pyw}] +max_line_length = 999 +ij_python_align_collections_and_comprehensions = true +ij_python_align_multiline_imports = true +ij_python_align_multiline_parameters = true +ij_python_align_multiline_parameters_in_calls = true +ij_python_blank_line_at_file_end = true +ij_python_blank_lines_after_imports = 1 +ij_python_blank_lines_after_local_imports = 0 +ij_python_blank_lines_around_class = 1 +ij_python_blank_lines_around_method = 1 +ij_python_blank_lines_around_top_level_classes_functions = 2 +ij_python_blank_lines_before_first_method = 0 +ij_python_call_parameters_new_line_after_left_paren = false +ij_python_call_parameters_right_paren_on_new_line = false +ij_python_call_parameters_wrap = normal +ij_python_dict_alignment = 1 +ij_python_dict_new_line_after_left_brace = false +ij_python_dict_new_line_before_right_brace = false +ij_python_dict_wrapping = 1 +ij_python_from_import_new_line_after_left_parenthesis = false +ij_python_from_import_new_line_before_right_parenthesis = false +ij_python_from_import_parentheses_force_if_multiline = true +ij_python_from_import_trailing_comma_if_multiline = false +ij_python_from_import_wrapping = 1 +ij_python_hang_closing_brackets = false +ij_python_keep_blank_lines_in_code = 1 +ij_python_keep_blank_lines_in_declarations = 1 +ij_python_keep_indents_on_empty_lines = false +ij_python_keep_line_breaks = true +ij_python_method_parameters_new_line_after_left_paren = false +ij_python_method_parameters_right_paren_on_new_line = false +ij_python_method_parameters_wrap = normal +ij_python_new_line_after_colon = true +ij_python_new_line_after_colon_multi_clause = true +ij_python_optimize_imports_always_split_from_imports = false +ij_python_optimize_imports_case_insensitive_order = true +ij_python_optimize_imports_join_from_imports_with_same_source = true +ij_python_optimize_imports_sort_by_type_first = true +ij_python_optimize_imports_sort_imports = true +ij_python_optimize_imports_sort_names_in_from_imports = true +ij_python_space_after_comma = true +ij_python_space_after_number_sign = true +ij_python_space_after_py_colon = true +ij_python_space_before_backslash = true +ij_python_space_before_comma = false +ij_python_space_before_for_semicolon = false +ij_python_space_before_lbracket = false +ij_python_space_before_method_call_parentheses = false +ij_python_space_before_method_parentheses = false +ij_python_space_before_number_sign = true +ij_python_space_before_py_colon = false +ij_python_space_within_empty_method_call_parentheses = false +ij_python_space_within_empty_method_parentheses = false +ij_python_spaces_around_additive_operators = true +ij_python_spaces_around_assignment_operators = true +ij_python_spaces_around_bitwise_operators = true +ij_python_spaces_around_eq_in_keyword_argument = false +ij_python_spaces_around_eq_in_named_parameter = false +ij_python_spaces_around_equality_operators = true +ij_python_spaces_around_multiplicative_operators = true +ij_python_spaces_around_power_operator = true +ij_python_spaces_around_relational_operators = true +ij_python_spaces_around_shift_operators = true +ij_python_spaces_within_braces = false +ij_python_spaces_within_brackets = false +ij_python_spaces_within_method_call_parentheses = false +ij_python_spaces_within_method_parentheses = false +ij_python_use_continuation_indent_for_arguments = true +ij_python_use_continuation_indent_for_collection_and_comprehensions = true +ij_python_use_continuation_indent_for_parameters = true +ij_python_wrap_long_lines = false + +[{*.yaml,*.yml}] +indent_size = 4 +ij_yaml_align_values_properties = do_not_align +ij_yaml_autoinsert_sequence_marker = true +ij_yaml_block_mapping_on_new_line = false +ij_yaml_indent_sequence_value = true +ij_yaml_keep_indents_on_empty_lines = false +ij_yaml_keep_line_breaks = true +ij_yaml_sequence_on_new_line = false +ij_yaml_space_before_colon = false +ij_yaml_spaces_within_braces = true +ij_yaml_spaces_within_brackets = true diff --git a/.flake8 b/.flake8 index 87a7d626..a46d303e 100644 --- a/.flake8 +++ b/.flake8 @@ -1,5 +1,5 @@ [flake8] -ignore = E501, F821, W504, W605, E303 +ignore = E501, F821, W504, W605, E303, E126, E131, E241 show-source = True count = True statistics = True diff --git a/docs/reference/secure.rst b/docs/reference/secure.rst index b7a4a27c..49737237 100644 --- a/docs/reference/secure.rst +++ b/docs/reference/secure.rst @@ -6,3 +6,9 @@ Sysdig Secure :members: :inherited-members: :undoc-members: + +.. automodule:: sdcclient +.. autoclass:: SdScanningClient + :members: + :inherited-members: + :undoc-members: diff --git a/sdcclient/__init__.py b/sdcclient/__init__.py index c61b16b6..b431fdf1 100644 --- a/sdcclient/__init__.py +++ b/sdcclient/__init__.py @@ -1,6 +1,6 @@ import sdcclient.monitor as monitor import sdcclient.secure as secure -from sdcclient._monitor import SdMonitorClient, SdcClient +from sdcclient._monitor import SdcClient, SdMonitorClient from sdcclient._monitor_v1 import SdMonitorClientV1 from sdcclient._scanning import SdScanningClient from sdcclient._secure import SdSecureClient diff --git a/sdcclient/_scanning.py b/sdcclient/_scanning.py index 149a4d3e..24de165f 100644 --- a/sdcclient/_scanning.py +++ b/sdcclient/_scanning.py @@ -8,6 +8,8 @@ from requests.exceptions import RetryError from requests_toolbelt.multipart.encoder import MultipartEncoder +from sdcclient.secure.scanning import ScanningAlertsClientV1 + try: from urllib.parse import quote_plus, unquote_plus except ImportError: @@ -16,8 +18,7 @@ from sdcclient._common import _SdcCommon -class SdScanningClient(_SdcCommon): - +class SdScanningClient(ScanningAlertsClientV1, _SdcCommon): def __init__(self, token="", sdc_url='https://secure.sysdig.com', ssl_verify=True, custom_headers=None): super(SdScanningClient, self).__init__(token, sdc_url, ssl_verify, custom_headers) self.product = "SDS" @@ -47,9 +48,9 @@ def add_image(self, image, force=False, dockerfile=None, annotations={}, autosub payload['annotations'] = annotations url = "{base_url}/api/scanning/v1/anchore/images?autosubscribe={autosubscribe}{force}".format( - base_url=self.url, - autosubscribe=str(autosubscribe), - force="&force=true" if force else "") + base_url=self.url, + autosubscribe=str(autosubscribe), + force="&force=true" if force else "") res = self.http.post(url, data=json.dumps(payload), headers=self.hdrs, verify=self.ssl_verify) if not self._checkResponse(res): @@ -78,8 +79,8 @@ def get_image(self, image, show_history=False): url = self.url + "/api/scanning/v1/anchore/images" url += { - 'imageid': '/by_id/{}'.format(image), - 'imageDigest': '/{}'.format(image) + 'imageid': '/by_id/{}'.format(image), + 'imageDigest': '/{}'.format(image) }.get(itype, '') res = self.http.get(url, params=params, headers=self.hdrs, verify=self.ssl_verify) @@ -209,12 +210,12 @@ def query_images_by_vulnerability(self, vulnerability_id, namespace=None, packag A JSON object representing the images. ''' url = "{base_url}/api/scanning/v1/anchore/query/images/by_vulnerability?vulnerability_id={vulnerability_id}{namespace}{package}{severity}&vendor_only={vendor_only}".format( - base_url=self.url, - vulnerability_id=vulnerability_id, - namespace="&namespace={}".format(namespace) if namespace else "", - package="&affected_package={}".format(package) if package else "", - severity="&severity={}".format(severity) if severity else "", - vendor_only=vendor_only) + base_url=self.url, + vulnerability_id=vulnerability_id, + namespace="&namespace={}".format(namespace) if namespace else "", + package="&affected_package={}".format(package) if package else "", + severity="&severity={}".format(severity) if severity else "", + vendor_only=vendor_only) res = self.http.get(url, headers=self.hdrs, verify=self.ssl_verify) if not self._checkResponse(res): @@ -235,10 +236,10 @@ def query_images_by_package(self, name, version=None, package_type=None): A JSON object representing the images. ''' url = "{base_url}/api/scanning/v1/anchore/query/images/by_package?name={name}{version}{package_type}".format( - base_url=self.url, - name=name, - version="&version={}".format(version) if version else "", - package_type="&package_type={}".format(package_type) if package_type else "") + base_url=self.url, + name=name, + version="&version={}".format(version) if version else "", + package_type="&package_type={}".format(package_type) if package_type else "") res = self.http.get(url, headers=self.hdrs, verify=self.ssl_verify) if not self._checkResponse(res): @@ -255,11 +256,11 @@ def _query_image(self, image, query_group="", query_type="", vendor_only=True): return [False, "cannot use input image string (no discovered imageDigest)"] url = "{base_url}/api/scanning/v1/anchore/images/{image_digest}/{query_group}/{query_type}{vendor_only}".format( - base_url=self.url, - image_digest=image_digest, - query_group=query_group, - query_type=query_type if query_type else '', - vendor_only="?vendor_only={}".format(vendor_only) if query_group == 'vuln' else '') + base_url=self.url, + image_digest=image_digest, + query_group=query_group, + query_type=query_type if query_type else '', + vendor_only="?vendor_only={}".format(vendor_only) if query_group == 'vuln' else '') res = self.http.get(url, headers=self.hdrs, verify=self.ssl_verify) if not self._checkResponse(res): @@ -309,12 +310,12 @@ def check_image_evaluation(self, image, show_history=False, detail=False, tag=No url = "{base_url}/api/scanning/v1/anchore/images/{image_digest}/check?history={history}&detail={detail}&tag={tag}{policy_id}" url = url.format( - base_url=self.url, - image_digest=image_digest, - history=str(show_history).lower(), - detail=str(detail).lower(), - tag=thetag, - policy_id=("&policyId=%s" % policy) if policy else "") + base_url=self.url, + image_digest=image_digest, + history=str(show_history).lower(), + detail=str(detail).lower(), + tag=thetag, + policy_id=("&policyId=%s" % policy) if policy else "") res = self.http.get(url, headers=self.hdrs, verify=self.ssl_verify) if not self._checkResponse(res): @@ -343,10 +344,10 @@ def get_pdf_report(self, image, tag=None, date=None): image_tag = tag if tag else image url = "{base_url}/api/scanning/v1/images/{image_digest}/report?tag={tag}{at}".format( - base_url=self.url, - image_digest=image_digest, - tag=image_tag, - at=("&at=%s" % date) if date else "") + base_url=self.url, + image_digest=image_digest, + tag=image_tag, + at=("&at=%s" % date) if date else "") res = self.http.get(url, headers=self.hdrs, verify=self.ssl_verify) if not self._checkResponse(res): @@ -366,9 +367,9 @@ def get_latest_pdf_report_by_digest(self, image_digest, full_tag=None): The pdf content ''' url = "{base_url}/api/scanning/v1/images/{image_digest}/report?tag={tag}".format( - base_url=self.url, - image_digest=image_digest, - tag=full_tag) + base_url=self.url, + image_digest=image_digest, + tag=full_tag) res = self.http.get(url, headers=self.hdrs, verify=self.ssl_verify) if not self._checkResponse(res): @@ -390,7 +391,7 @@ def import_image(self, infile, image_id, digest_id, image_name, sync=False): ''' try: m = MultipartEncoder( - fields={'archive_file': (infile, open(infile, 'rb'), 'text/plain')} + fields={'archive_file': (infile, open(infile, 'rb'), 'text/plain')} ) if sync: url = self.url + "/api/scanning/v1/anchore/import/images" @@ -398,7 +399,7 @@ def import_image(self, infile, image_id, digest_id, image_name, sync=False): url = self.url + "/api/scanning/v1/import/images" headers = {'Authorization': 'Bearer ' + self.token, 'Content-Type': m.content_type, - 'imageId': image_id, 'digestId': digest_id, 'imageName': image_name} + 'imageId': image_id, 'digestId': digest_id, 'imageName': image_name} res = self.http.post(url, data=m, headers=headers, verify=self.ssl_verify) if not self._checkResponse(res): return [False, self.lasterr] @@ -438,10 +439,10 @@ def get_image_scan_result_by_id(self, image_id, full_tag_name, detail): A JSON object containing pass/fail status of image scan policy. ''' url = "{base_url}/api/scanning/v1/anchore/images/by_id/{image_id}/check?tag={full_tag_name}&detail={detail}".format( - base_url=self.url, - image_id=image_id, - full_tag_name=full_tag_name, - detail=detail) + base_url=self.url, + image_id=image_id, + full_tag_name=full_tag_name, + detail=detail) res = self.http.get(url, headers=self.hdrs, verify=self.ssl_verify) if not self._checkResponse(res): return [False, self.lasterr] @@ -475,14 +476,14 @@ def add_registry(self, registry, registry_user, registry_pass, insecure=False, r registry_type = self._get_registry_type(registry) payload = { - 'registry': registry, - 'registry_user': registry_user, - 'registry_pass': registry_pass, - 'registry_type': registry_type, - 'registry_verify': not insecure} + 'registry': registry, + 'registry_user': registry_user, + 'registry_pass': registry_pass, + 'registry_type': registry_type, + 'registry_verify': not insecure} url = "{base_url}/api/scanning/v1/anchore/registries?validate={validate}".format( - base_url=self.url, - validate=validate) + base_url=self.url, + validate=validate) res = self.http.post(url, data=json.dumps(payload), headers=self.hdrs, verify=self.ssl_verify) if not self._checkResponse(res): @@ -511,15 +512,15 @@ def update_registry(self, registry, registry_user, registry_pass, insecure=False "input registry name cannot contain '/' characters - valid registry names are of the form : where : is optional"] payload = { - 'registry': registry, - 'registry_user': registry_user, - 'registry_pass': registry_pass, - 'registry_type': registry_type, - 'registry_verify': not insecure} + 'registry': registry, + 'registry_user': registry_user, + 'registry_pass': registry_pass, + 'registry_type': registry_type, + 'registry_verify': not insecure} url = "{base_url}/api/scanning/v1/anchore/registries/{registry}?validate={validate}".format( - base_url=self.url, - registry=registry, - validate=validate) + base_url=self.url, + registry=registry, + validate=validate) res = self.http.put(url, data=json.dumps(payload), headers=self.hdrs, verify=self.ssl_verify) if not self._checkResponse(res): @@ -605,10 +606,10 @@ def add_repo(self, repo, autosubscribe=True, lookuptag=None): A JSON object representing the repo. ''' url = "{base_url}/api/scanning/v1/anchore/repositories?repository={repo}&autosubscribe={autosubscribe}{lookuptag}".format( - base_url=self.url, - repo=repo, - autosubscribe=autosubscribe, - lookuptag="&lookuptag={}".format(lookuptag) if lookuptag else "") + base_url=self.url, + repo=repo, + autosubscribe=autosubscribe, + lookuptag="&lookuptag={}".format(lookuptag) if lookuptag else "") res = self.http.post(url, headers=self.hdrs, verify=self.ssl_verify) if not self._checkResponse(res): @@ -681,10 +682,10 @@ def add_policy(self, name, rules, comment="", bundleid=None): A JSON object containing the policy description. ''' policy = { - 'name': name, - 'comment': comment, - 'rules': rules, - 'version': '1_0' + 'name': name, + 'comment': comment, + 'rules': rules, + 'version': '1_0' } if bundleid: policy['policyBundleId'] = bundleid @@ -699,8 +700,8 @@ def add_policy(self, name, rules, comment="", bundleid=None): def list_policy_bundles(self, detail=False): url = "{base_url}/api/scanning/v1/anchore/policies?detail={detail}".format( - base_url=self.url, - detail=str(detail)) + base_url=self.url, + detail=str(detail)) res = self.http.get(url, headers=self.hdrs, verify=self.ssl_verify) if not self._checkResponse(res): return [False, self.lasterr] @@ -787,28 +788,35 @@ def delete_policy(self, policyid, bundleid=None): def add_alert(self, name, description=None, scope="", triggers={'failed': True, 'unscanned': True}, enabled=False, notification_channels=[]): - '''**Description** - Create a new alert + ''' + Create a new alert + **Warning**: `add_alert` is deprecated and will be removed soon, use + `add_runtime_alert` or `add_repository_alert` from `ScanningAlertsClientV1` instead. - **Arguments** - - name: The name of the alert. - - description: The descprition of the alert. - - scope: An AND-composed string of predicates that selects the scope in which the alert will be applied. (like: 'host.domain = "example.com" and container.image != "alpine:latest"') - - tiggers: A dict {str: bool} indicating wich triggers should be enabled/disabled. (default: {'failed': True, 'unscanned': True}) - - enabled: Whether this alert should actually be applied. - - notification_channels: A list of notification channel ids. + Args: + name(str): The name of the alert. + description(str): The descprition of the alert. + scope(str): An AND-composed string of predicates that selects the scope in which the alert will be applied. (like: 'host.domain = "example.com" and container.image != "alpine:latest"') + tiggers(dict): A dict {str: bool} indicating wich triggers should be enabled/disabled. (default: {'failed': True, 'unscanned': True}) + enabled(bool): Whether this alert should actually be applied. + notification_channels(list): A list of notification channel ids. - **Success Return Value** + Returns: A JSON object containing the alert description. + + .. deprecated:: + `add_alert` is deprecated and will be removed soon, use + `add_runtime_alert` or `add_repository_alert` from `ScanningAlertsClientV1` instead. ''' + alert = { - 'name': name, - 'description': description, - 'triggers': triggers, - 'scope': scope, - 'enabled': enabled, - 'autoscan': True, - 'notificationChannelIds': notification_channels, + 'name': name, + 'description': description, + 'triggers': triggers, + 'scope': scope, + 'enabled': enabled, + 'autoscan': True, + 'notificationChannelIds': notification_channels, } url = self.url + '/api/scanning/v1/alerts' @@ -819,56 +827,22 @@ def add_alert(self, name, description=None, scope="", triggers={'failed': True, return [True, res.json()] - def list_alerts(self, limit=None, cursor=None): - '''**Description** - List the current set of scanning alerts. - - **Arguments** - - limit: Maximum number of alerts in the response. - - cursor: An opaque string representing the current position in the list of alerts. It's provided in the 'responseMetadata' of the list_alerts response. - - **Success Return Value** - A JSON object containing the list of alerts. + def update_alert(self, alertid, alert_description): ''' - url = self.url + '/api/scanning/v1/alerts' - if limit: - url += '?limit=' + str(limit) - if cursor: - url += '&cursor=' + cursor + Update the alert with the given id. + **Warning**: `update_alert` is deprecated and will be removed soon, use + `update_runtime_alert` or `update_repository_alert` from `ScanningAlertsClientV1` instead. - res = self.http.get(url, headers=self.hdrs, verify=self.ssl_verify) - if not self._checkResponse(res): - return [False, self.lasterr] - - return [True, res.json()] - - def get_alert(self, alertid): - '''**Description** - Retrieve the scanning alert with the given id - - **Arguments** - - alertid: Unique identifier associated with this alert. + Args: + alertid(str): Unique identifier associated with this alert. + alert_description(str): A dictionary with the alert description. - **Success Return Value** + Returns: A JSON object containing the alert description. - ''' - url = self.url + '/api/scanning/v1/alerts/' + alertid - res = self.http.get(url, headers=self.hdrs, verify=self.ssl_verify) - if not self._checkResponse(res): - return [False, self.lasterr] - - return [True, res.json()] - def update_alert(self, alertid, alert_description): - '''**Description** - Update the alert with the given id - - **Arguments** - - alertid: Unique identifier associated with this alert. - - alert_description: A dictionary with the alert description. - - **Success Return Value** - A JSON object containing the alert description. + .. deprecated:: + `update_alert` is deprecated and will be removed soon, use + `update_runtime_alert` or `update_repository_alert` from `ScanningAlertsClientV1` instead. ''' url = self.url + '/api/scanning/v1/alerts/' + alertid data = json.dumps(alert_description) @@ -878,20 +852,6 @@ def update_alert(self, alertid, alert_description): return [True, res.json()] - def delete_alert(self, policyid): - '''**Description** - Delete the alert with the given id - - **Arguments** - - alertid: Unique identifier associated with this alert. - ''' - url = self.url + '/api/scanning/v1/alerts/' + policyid - res = self.http.delete(url, headers=self.hdrs, verify=self.ssl_verify) - if not self._checkResponse(res): - return [False, self.lasterr] - - return [True, res.text] - def get_subscriptions(self, subscription_type=None, subscription_key=None): '''**Description** Get the list of subscriptions @@ -1020,8 +980,8 @@ def list_runtime(self, scope="", skip_policy_evaluation=True, start_time=None, e A JSON object representing the list of runtime containers. ''' containers = { - 'scope': scope, - 'skipPolicyEvaluation': skip_policy_evaluation + 'scope': scope, + 'skipPolicyEvaluation': skip_policy_evaluation } if start_time or end_time: containers['time'] = {} @@ -1084,7 +1044,7 @@ def get_vulnerability_details(self, id): url = f"{self.url}/api/scanning/v1/anchore/query/vulnerabilities" params = { - "id": id, + "id": id, } res = self.http.get(url, params=params, headers=self.hdrs, verify=self.ssl_verify) @@ -1103,9 +1063,9 @@ def add_vulnerability_exception_bundle(self, name, comment=""): url = f"{self.url}/api/scanning/v1/vulnexceptions" params = { - "version": "1_0", - "name": name, - "comment": comment, + "version": "1_0", + "name": name, + "comment": comment, } data = json.dumps(params) @@ -1129,7 +1089,7 @@ def list_vulnerability_exception_bundles(self): url = f"{self.url}/api/scanning/v1/vulnexceptions" params = { - "bundleId": "default", + "bundleId": "default", } res = self.http.get(url, params=params, headers=self.hdrs, verify=self.ssl_verify) @@ -1142,7 +1102,7 @@ def get_vulnerability_exception_bundle(self, bundle): url = f"{self.url}/api/scanning/v1/vulnexceptions/{bundle}" params = { - "bundleId": "default", + "bundleId": "default", } res = self.http.get(url, params=params, headers=self.hdrs, verify=self.ssl_verify) @@ -1158,11 +1118,11 @@ def add_vulnerability_exception(self, bundle, cve, note=None, expiration_date=No url = f"{self.url}/api/scanning/v1/vulnexceptions/{bundle}/vulnerabilities" params = { - "gate": "vulnerabilities", - "is_busy": False, - "trigger_id": f"{cve}+*", - "expiration_date": int(expiration_date) if expiration_date else None, - "notes": note, + "gate": "vulnerabilities", + "is_busy": False, + "trigger_id": f"{cve}+*", + "expiration_date": int(expiration_date) if expiration_date else None, + "notes": note, } data = json.dumps(params) @@ -1178,7 +1138,7 @@ def delete_vulnerability_exception(self, bundle, id): url = f"{self.url}/api/scanning/v1/vulnexceptions/{bundle}/vulnerabilities/{id}" params = { - "bundleId": "default", + "bundleId": "default", } res = self.http.delete(url, params=params, headers=self.hdrs, verify=self.ssl_verify) @@ -1191,15 +1151,15 @@ def update_vulnerability_exception(self, bundle, id, cve, enabled, note, expirat url = f"{self.url}/api/scanning/v1/vulnexceptions/{bundle}/vulnerabilities/{id}" data = { - "id": id, - "gate": "vulnerabilities", - "trigger_id": f"{cve}+*", - "enabled": enabled, - "notes": note, - "expiration_date": int(expiration_date) if expiration_date else None, + "id": id, + "gate": "vulnerabilities", + "trigger_id": f"{cve}+*", + "enabled": enabled, + "notes": note, + "expiration_date": int(expiration_date) if expiration_date else None, } params = { - "bundleId": "default", + "bundleId": "default", } res = self.http.put(url, data=json.dumps(data), params=params, headers=self.hdrs, verify=self.ssl_verify) @@ -1227,18 +1187,17 @@ def download_cve_report_csv(self, vuln_type="os", scope_type="static"): url = f"{self.url}/api/scanning/v1/reports/csv" params = { - "queryType": "vuln", - "scopeType": scope_type, - "staticScope": - { - "registry": "", - "repository": "", - "tag": "" + "queryType": "vuln", + "scopeType": scope_type, + "staticScope": { + "registry": "", + "repository": "", + "tag": "" }, - "runtimeScope": {}, - "imageQueryFilter": {"vType": vuln_type}, - "offset": 0, - "limit": 100000 + "runtimeScope": {}, + "imageQueryFilter": {"vType": vuln_type}, + "offset": 0, + "limit": 100000 } res = self.http.post(url, data=json.dumps(params), headers=self.hdrs, verify=self.ssl_verify) @@ -1272,7 +1231,7 @@ def get_image_scanning_results(self, image_name, policy_id=None): url = f"{self.url}/api/scanning/v1/images/{image_digest}/policyEvaluation" params = { - "tag": image_tag, + "tag": image_tag, } res = self.http.get(url, headers=self.hdrs, params=params, verify=self.ssl_verify) @@ -1282,17 +1241,17 @@ def get_image_scanning_results(self, image_name, policy_id=None): json_res = res.json() result = { - "image_digest": json_res["imageDigest"], - "image_id": json_res["imageId"], - "status": json_res["status"], - "image_tag": image_tag, - "total_stop": json_res["nStop"], - "total_warn": json_res["nWarn"], - "last_evaluation": datetime.utcfromtimestamp(json_res["at"]), - "policy_id": "*", - "policy_name": "All policies", - "warn_results": [], - "stop_results": [] + "image_digest": json_res["imageDigest"], + "image_id": json_res["imageId"], + "status": json_res["status"], + "image_tag": image_tag, + "total_stop": json_res["nStop"], + "total_warn": json_res["nWarn"], + "last_evaluation": datetime.utcfromtimestamp(json_res["at"]), + "policy_id": "*", + "policy_name": "All policies", + "warn_results": [], + "stop_results": [] } if policy_id: diff --git a/sdcclient/secure/__init__.py b/sdcclient/secure/__init__.py index 93e72f7e..1537faf9 100644 --- a/sdcclient/secure/__init__.py +++ b/sdcclient/secure/__init__.py @@ -2,9 +2,10 @@ from ._falco_rules_files_old import FalcoRulesFilesClientOld from ._policy_events_old import PolicyEventsClientOld from ._policy_events_v1 import PolicyEventsClientV1 -from ._policy_v2 import policy_action_capture, policy_action_kill, policy_action_pause, policy_action_stop, \ - PolicyClientV2 +from ._policy_v2 import (policy_action_capture, policy_action_kill, policy_action_pause, policy_action_stop, + PolicyClientV2) +from .scanning._alerts import ScanningAlertsClientV1 __all__ = ["PolicyEventsClientOld", "PolicyEventsClientV1", "FalcoRulesFilesClientOld", "PolicyClientV2", "policy_action_pause", "policy_action_stop", "policy_action_kill", "policy_action_capture", - "ActivityAuditClientV1", "ActivityAuditDataSource"] + "ActivityAuditClientV1", "ActivityAuditDataSource", "ScanningAlertsClientV1"] diff --git a/sdcclient/secure/scanning/__init__.py b/sdcclient/secure/scanning/__init__.py new file mode 100644 index 00000000..721ec957 --- /dev/null +++ b/sdcclient/secure/scanning/__init__.py @@ -0,0 +1,3 @@ +from ._alerts import ScanningAlertsClientV1 + +__all__ = ["ScanningAlertsClientV1"] diff --git a/sdcclient/secure/scanning/_alerts.py b/sdcclient/secure/scanning/_alerts.py new file mode 100644 index 00000000..7c721d03 --- /dev/null +++ b/sdcclient/secure/scanning/_alerts.py @@ -0,0 +1,381 @@ +import json + +from sdcclient._common import _SdcCommon + + +class ScanningAlertsClientV1(_SdcCommon): + def __init__(self, token="", sdc_url='https://secure.sysdig.com', ssl_verify=True, custom_headers=None): + super(ScanningAlertsClientV1, self).__init__(token, sdc_url, ssl_verify, custom_headers) + self.product = "SDS" + + class RepositoryAlertTrigger: + @staticmethod + def new_image_analyzed(alert): + alert["triggers"]["analysis_update"] = True + + @staticmethod + def scan_result_change_fail(alert): + alert["triggers"]["policy_eval"] = True + alert["onlyPassFail"] = True + + @staticmethod + def scan_result_change_any(alert): + alert["triggers"]["policy_eval"] = True + alert["onlyPassFail"] = False + + @staticmethod + def cve_update(alert): + alert["triggers"]["vuln_update"] = True + + def add_repository_alert(self, name, registry, repository, tag, description="", triggers=None, notification_channels=None, enabled=True): + ''' + Create a new repository alert + + Args: + name(str): The name of the alert. + registry(str): Registry to alert (e.g. docker.io) + repository(str): Repository to alert (e.g. sysdig/agent) + tag(str): Tag to alert (e.g. latest) + description(str): The description of the alert. + triggers(list): A list of RepositoryAlertTrigger indicating which triggers should be enabled. (default: [ScanningAlertsClientV1.RuntimeAlertTrigger.new_image_analyzed]) + notification_channels(list): A list of notification channel ids. + enabled(bool): Whether this alert should actually be applied. Defaults to true. + Returns: + The created alert. + + Examples: + >>> client = ScanningAlertsClientV1(sdc_url=os.getenv("SDC_SECURE_URL", "https://secure.sysdig.com"), + >>> token=os.getenv("SDC_SECURE_TOKEN")) + >>> ok, res = client.add_repository_alert( + >>> name="A name", + >>> registry="docker.io", + >>> repository="sysdig/agent", + >>> tag="latest", + >>> description="A description", + >>> triggers=[ScanningAlertsClientV1.RepositoryAlertTrigger.new_image_analyzed, + >>> ScanningAlertsClientV1.RepositoryAlertTrigger.scan_result_change_fail, + >>> ScanningAlertsClientV1.RepositoryAlertTrigger.cve_update] + >>>) + >>> if not ok: + >>> print(f"error creating alert: {res}") + >>> alert_id = res["alertId"] + ''' + if not triggers: + triggers = [ScanningAlertsClientV1.RepositoryAlertTrigger.new_image_analyzed] + + alert = { + 'name': name, + 'description': description, + 'type': 'repository', + 'triggers': { + "unscanned": False, + "analysis_update": False, + "vuln_update": False, + "policy_eval": False, + "failed": False + }, + 'repositories': [{ + 'registry': registry, + 'repository': repository, + 'tag': tag, + }], + "onlyPassFail": False, + "skipEventSend": False, + 'enabled': enabled, + 'notificationChannelIds': notification_channels, + } + + for trigger in triggers: + trigger(alert) + + res = self.http.post(f"{self.url}/api/scanning/v1/alerts", headers=self.hdrs, data=json.dumps(alert), verify=self.ssl_verify) + if not self._checkResponse(res): + return [False, self.lasterr] + + return [True, res.json()] + + def update_repository_alert(self, id, name=None, registry=None, repository=None, tag=None, description=None, triggers=None, notification_channels=None, enabled=None): + ''' + Updates a repository alert. Fields that are not specified, will not be modified. + + Args: + id(str): Alert ID. + name(str): The name of the alert. + registry(str): Registry to alert (e.g. docker.io) + repository(str): Repository to alert (e.g. sysdig/agent) + tag(str): Tag to alert (e.g. latest) + description(str): The description of the alert. + triggers(list): A list of RepositoryAlertTrigger indicating which triggers should be enabled. (default: [ScanningAlertsClientV1.RuntimeAlertTrigger.unscanned_image]) + notification_channels(list): A list of notification channel ids. + enabled(bool): Whether this alert should actually be applied. Defaults to true. + Returns: + The updated alert. + + Examples: + >>> client = ScanningAlertsClientV1(sdc_url=os.getenv("SDC_SECURE_URL", "https://secure.sysdig.com"), + >>> token=os.getenv("SDC_SECURE_TOKEN")) + >>> ok, res = client.update_repository_alert( + >>> id=alert_id, + >>> name="An updated name", + >>> registry="updated_registry", + >>> repository="updated_repository", + >>> tag="v1", + >>> description="An updated description", + >>> triggers=[ScanningAlertsClientV1.RepositoryAlertTrigger.scan_result_change_fail] + >>> ) + >>> if not ok: + >>> print(f"error updating alert: {res}") + >>> alert_id = res["alertId"] + ''' + ok, alert = self.get_alert(id) + if not ok: + return False, f"unable to retrieve alert by ID {id}: {alert}" + + if name is not None: + alert["name"] = name + if description is not None: + alert["description"] = description + if registry is not None: + alert["repositories"][0]["registry"] = registry + if repository is not None: + alert["repositories"][0]["repository"] = repository + if tag is not None: + alert["repositories"][0]["tag"] = tag + if triggers is not None: + alert["triggers"] = { + "unscanned": False, + "analysis_update": False, + "vuln_update": False, + "policy_eval": False, + "failed": False + } + alert["onlyPassFail"] = False + for trigger in triggers: + trigger(alert) + if notification_channels is not None: + alert["notificationChannelIds"] = notification_channels + if enabled is not None: + alert["enabled"] = enabled + + res = self.http.put(f"{self.url}/api/scanning/v1/alerts/{id}", headers=self.hdrs, data=json.dumps(alert), verify=self.ssl_verify) + if not self._checkResponse(res): + return [False, self.lasterr] + + return [True, res.json()] + + class RuntimeAlertTrigger: + @staticmethod + def unscanned_image(alert): + alert["triggers"]["unscanned"] = True + + @staticmethod + def scan_result_change_fail(alert): + alert["triggers"]["policy_eval"] = True + alert["onlyPassFail"] = True + + @staticmethod + def scan_result_change_any(alert): + alert["triggers"]["policy_eval"] = True + alert["onlyPassFail"] = False + + @staticmethod + def cve_update(alert): + alert["triggers"]["vuln_update"] = True + + def add_runtime_alert(self, name, description="", scope="", triggers=None, notification_channels=None, enabled=True): + ''' + Create a new runtime alert + + Args: + name(str): The name of the alert. + description(str): The description of the alert. + scope(str): An AND-composed string of predicates that selects the scope in which the alert will be applied. (like: 'host.domain = "example.com" and container.image != "alpine:latest"') + triggers(list): A list of RuntimeAlertTrigger indicating which triggers should be enabled. (default: [ScanningAlertsClientV1.RuntimeAlertTrigger.unscanned_image]) + notification_channels(list): A list of notification channel ids. + enabled(bool): Whether this alert should actually be applied. Defaults to true. + Returns: + The created alert. + + Examples: + >>> client = ScanningAlertsClientV1(sdc_url=os.getenv("SDC_SECURE_URL", "https://secure.sysdig.com"), + >>> token=os.getenv("SDC_SECURE_TOKEN")) + >>> ok, res = client.add_runtime_alert( + >>> name="A name", + >>> description="A description", + >>> scope="", + >>> triggers=[ScanningAlertsClientV1.RuntimeAlertTrigger.unscanned_image, + >>> ScanningAlertsClientV1.RuntimeAlertTrigger.scan_result_change_fail, + >>> ScanningAlertsClientV1.RuntimeAlertTrigger.cve_update] + >>>) + >>> if not ok: + >>> print(f"error creating alert: {res}") + >>> alert_id = res["alertId"] + ''' + if not triggers: + triggers = [ScanningAlertsClientV1.RuntimeAlertTrigger.unscanned_image] + + alert = { + 'name': name, + 'description': description, + 'type': 'runtime', + 'triggers': { + "unscanned": False, + "analysis_update": False, + "vuln_update": False, + "policy_eval": False, + "failed": False + }, + 'scope': scope, + "onlyPassFail": False, + "skipEventSend": False, + 'enabled': enabled, + 'notificationChannelIds': notification_channels, + } + + for trigger in triggers: + trigger(alert) + + res = self.http.post(f"{self.url}/api/scanning/v1/alerts", headers=self.hdrs, data=json.dumps(alert), verify=self.ssl_verify) + if not self._checkResponse(res): + return [False, self.lasterr] + + return [True, res.json()] + + def update_runtime_alert(self, id, name=None, description=None, scope=None, triggers=None, notification_channels=None, enabled=None): + ''' + Updates a runtime alert. Fields that are not specified, will not be modified. + + Args: + id(str): Alert ID. + name(str): The name of the alert. + description(str): The description of the alert. + scope(str): An AND-composed string of predicates that selects the scope in which the alert will be applied. (like: 'host.domain = "example.com" and container.image != "alpine:latest"') + triggers(list): A list of RuntimeAlertTrigger indicating which triggers should be enabled. (default: [ScanningAlertsClientV1.RuntimeAlertTrigger.unscanned_image]) + notification_channels(list): A list of notification channel ids. + enabled(bool): Whether this alert should actually be applied. Defaults to true. + Returns: + The updated alert. + + Examples: + >>> client = ScanningAlertsClientV1(sdc_url=os.getenv("SDC_SECURE_URL", "https://secure.sysdig.com"), + >>> token=os.getenv("SDC_SECURE_TOKEN")) + >>> ok, res = client.update_runtime_alert( + >>> id=alert_id, + >>> name="An updated name", + >>> description="An updated description", + >>> scope="agent.id = 'foo'", + >>> triggers=[ScanningAlertsClientV1.RuntimeAlertTrigger.scan_result_change_fail] + >>> ) + >>> if not ok: + >>> print(f"error updating alert: {res}") + >>> alert_id = res["alertId"] + ''' + ok, alert = self.get_alert(id) + if not ok: + return False, f"unable to retrieve alert by ID {id}: {alert}" + + if name is not None: + alert["name"] = name + if description is not None: + alert["description"] = description + if scope is not None: + alert["scope"] = scope + if triggers is not None: + alert["triggers"] = { + "unscanned": False, + "analysis_update": False, + "vuln_update": False, + "policy_eval": False, + "failed": False + } + alert["onlyPassFail"] = False + for trigger in triggers: + trigger(alert) + if notification_channels is not None: + alert["notificationChannelIds"] = notification_channels + if enabled is not None: + alert["enabled"] = enabled + + res = self.http.put(f"{self.url}/api/scanning/v1/alerts/{id}", headers=self.hdrs, data=json.dumps(alert), verify=self.ssl_verify) + if not self._checkResponse(res): + return [False, self.lasterr] + + return [True, res.json()] + + def get_alert(self, alertid): + ''' + Retrieve the scanning alert with the given id + + Args: + alertid: Unique identifier associated with this alert. + + Returns: + A JSON object containing the alert description. + + Examples: + >>> client = ScanningAlertsClientV1(sdc_url=os.getenv("SDC_SECURE_URL", "https://secure.sysdig.com"), + >>> token=os.getenv("SDC_SECURE_TOKEN")) + >>> ok, res = client.get_alert(alert_id) + >>> if not ok: + >>> print(f"error retrieving alert {alert_id}: {res}") + >>> alert = res + ''' + + res = self.http.get(f"{self.url}/api/scanning/v1/alerts/{alertid}", headers=self.hdrs, verify=self.ssl_verify) + if not self._checkResponse(res): + return [False, self.lasterr] + return [True, res.json()] + + def list_alerts(self, limit=None, cursor=None): + ''' + List the current set of scanning alerts. + Args: + limit(int): Maximum number of alerts in the response. + cursor: An opaque string representing the current position in the list of alerts. It's provided in the 'responseMetadata' of the list_alerts response. + + Returns: + A JSON containing the list of alerts in the 'alerts' field, and the current cursor position in the 'responseMetadata' field. + + Examples: + >>> client = ScanningAlertsClientV1(sdc_url=os.getenv("SDC_SECURE_URL", "https://secure.sysdig.com"), + >>> token=os.getenv("SDC_SECURE_TOKEN")) + >>> ok, res =client.list_alerts() + >>> if not ok: + >>> print(f"error listing alerts: {res}") + >>> for alert in res["alerts"]: + >>> print(alert["alertId"]) + >>> + >>> # Load more alerts + >>> if res["responseMetadata"] is not None: + >>> ok, res = client.list_alerts(cursor=res["responseMetadata"]["next_cursor"]) + ''' + + url = f"{self.url}/api/scanning/v1/alerts" + if limit: + url += '?limit=' + str(limit) + if cursor: + url += '&cursor=' + cursor + + res = self.http.get(url, headers=self.hdrs, verify=self.ssl_verify) + if not self._checkResponse(res): + return [False, self.lasterr] + + return [True, res.json()] + + def delete_alert(self, policyid): # FIXME: policyid must be maintained for backwards compatibility reasons with older versions, but should be renamed to id or alert_id + ''' + Delete the alert with the given id + + Args: + policyid: Unique identifier associated with this alert. + + Examples: + >>> client = ScanningAlertsClientV1(sdc_url=os.getenv("SDC_SECURE_URL", "https://secure.sysdig.com"), + >>> token=os.getenv("SDC_SECURE_TOKEN")) + >>> client.delete_alert(alert_id) + ''' + + res = self.http.delete(f"{self.url}/api/scanning/v1/alerts/{policyid}", headers=self.hdrs, verify=self.ssl_verify) + if not self._checkResponse(res): + return [False, self.lasterr] + return [True, res.text] diff --git a/specs/secure/scanning/alerts_spec.py b/specs/secure/scanning/alerts_spec.py new file mode 100644 index 00000000..428131a1 --- /dev/null +++ b/specs/secure/scanning/alerts_spec.py @@ -0,0 +1,243 @@ +import os + +from expects import be_empty, be_false, be_true, contain, contain_exactly, expect, have_keys +from mamba import after, before, context, description, it + +from sdcclient import SdScanningClient +from specs import be_successful_api_call + +with description("Scanning Alerts") as self: + with before.all: + self.client = SdScanningClient(sdc_url=os.getenv("SDC_SECURE_URL", "https://secure.sysdig.com"), + token=os.getenv("SDC_SECURE_TOKEN")) + + with after.all: + ok, res = self.client.list_alerts() + expect((ok, res)).to(be_successful_api_call) + + for alert in res["alerts"]: + self.client.delete_alert(alert["alertId"]) + + with it("lists all the scanning alerts"): + ok, res = self.client.add_runtime_alert( + name="A name", + description="A description", + scope="", + triggers=[SdScanningClient.RuntimeAlertTrigger.unscanned_image] + ) + expect((ok, res)).to(be_successful_api_call) + + ok, res = self.client.list_alerts() + expect((ok, res)).to(be_successful_api_call) + expect(res["alerts"]).to_not(be_empty) + expect(res["alerts"]).to(contain(have_keys("customerId", "teamId", "alertId", "enabled", "type", "name", "description", "scope", "repositories", "triggers", "autoscan", "onlyPassFail", "skipEventSend", "notificationChannelIds"))) + + with context("when creating a runtime alert"): + with it("creates an alert with unscanned image trigger"): + ok, res = self.client.add_runtime_alert( + name="A name", + description="A description", + scope="", + triggers=[SdScanningClient.RuntimeAlertTrigger.unscanned_image] + ) + + expect((ok, res)).to(be_successful_api_call) + expect(res).to(have_keys(name="A name", description="A description", scope="", triggers=have_keys(unscanned=be_true))) + + with it("creates an alert with scan result change trigger as 'Pass > Fail'"): + ok, res = self.client.add_runtime_alert( + name="A name", + description="A description", + scope="", + triggers=[SdScanningClient.RuntimeAlertTrigger.scan_result_change_fail] + ) + + expect((ok, res)).to(be_successful_api_call) + expect(res).to(have_keys(name="A name", description="A description", scope="", triggers=have_keys(policy_eval=be_true), onlyPassFail=be_true)) + + with it("creates an alert with scan result change trigger as 'Any Change'"): + ok, res = self.client.add_runtime_alert( + name="A name", + description="A description", + scope="", + triggers=[SdScanningClient.RuntimeAlertTrigger.scan_result_change_any] + ) + + expect((ok, res)).to(be_successful_api_call) + expect(res).to(have_keys(name="A name", description="A description", scope="", triggers=have_keys(policy_eval=be_true), onlyPassFail=be_false)) + + with it("creates an alert with cve update trigger"): + ok, res = self.client.add_runtime_alert( + name="A name", + description="A description", + scope="", + triggers=[SdScanningClient.RuntimeAlertTrigger.cve_update] + ) + + expect((ok, res)).to(be_successful_api_call) + expect(res).to(have_keys(name="A name", description="A description", scope="", triggers=have_keys(vuln_update=be_true))) + + with it("creates an alert with multiple triggers"): + ok, res = self.client.add_runtime_alert( + name="A name", + description="A description", + scope="", + triggers=[SdScanningClient.RuntimeAlertTrigger.unscanned_image, + SdScanningClient.RuntimeAlertTrigger.scan_result_change_fail, + SdScanningClient.RuntimeAlertTrigger.cve_update] + ) + + expect((ok, res)).to(be_successful_api_call) + expect(res).to(have_keys(name="A name", description="A description", scope="", triggers=have_keys(unscanned=be_true, policy_eval=be_true, vuln_update=be_true), onlyPassFail=be_true)) + + with context("when creating a repository alert"): + with it("creates an alert with new image analyzed trigger"): + ok, res = self.client.add_repository_alert( + name="A name", + registry="registry", + repository="repository", + tag="latest", + description="A description", + triggers=[SdScanningClient.RepositoryAlertTrigger.new_image_analyzed] + ) + + expect((ok, res)).to(be_successful_api_call) + expect(res).to(have_keys(name="A name", description="A description", scope="", triggers=have_keys(analysis_update=be_true))) + + with it("creates an alert with scan result change trigger as 'Pass > Fail'"): + ok, res = self.client.add_repository_alert( + name="A name", + registry="registry", + repository="repository", + tag="latest", + description="A description", + triggers=[SdScanningClient.RepositoryAlertTrigger.scan_result_change_fail] + ) + + expect((ok, res)).to(be_successful_api_call) + expect(res).to(have_keys(name="A name", description="A description", scope="", triggers=have_keys(policy_eval=be_true), onlyPassFail=be_true)) + + with it("creates an alert with scan result change trigger as 'Any Change'"): + ok, res = self.client.add_repository_alert( + name="A name", + registry="registry", + repository="repository", + tag="latest", + description="A description", + triggers=[SdScanningClient.RepositoryAlertTrigger.scan_result_change_any] + ) + + expect((ok, res)).to(be_successful_api_call) + expect(res).to(have_keys(name="A name", description="A description", scope="", triggers=have_keys(policy_eval=be_true), onlyPassFail=be_false)) + + with it("creates an alert with cve update trigger"): + ok, res = self.client.add_repository_alert( + name="A name", + registry="registry", + repository="repository", + tag="latest", + description="A description", + triggers=[SdScanningClient.RepositoryAlertTrigger.cve_update] + ) + + expect((ok, res)).to(be_successful_api_call) + expect(res).to(have_keys(name="A name", description="A description", scope="", triggers=have_keys(vuln_update=be_true))) + + with it("creates an alert with multiple triggers"): + ok, res = self.client.add_repository_alert( + name="A name", + registry="registry", + repository="repository", + tag="latest", + description="A description", + triggers=[SdScanningClient.RepositoryAlertTrigger.new_image_analyzed, + SdScanningClient.RepositoryAlertTrigger.scan_result_change_fail, + SdScanningClient.RepositoryAlertTrigger.cve_update] + ) + + expect((ok, res)).to(be_successful_api_call) + expect(res).to(have_keys(name="A name", description="A description", scope="", triggers=have_keys(analysis_update=be_true, policy_eval=be_true, vuln_update=be_true), onlyPassFail=be_true)) + + with it("removes an alert correctly"): + ok, res = self.client.add_runtime_alert( + name="A name", + description="A description", + scope="", + triggers=[SdScanningClient.RuntimeAlertTrigger.unscanned_image] + ) + + expect((ok, res)).to(be_successful_api_call) + + alert_id = res["alertId"] + ok, res = self.client.delete_alert(alert_id) + expect((ok, res)).to(be_successful_api_call) + + ok, res = self.client.list_alerts() + expect((ok, res)).to(be_successful_api_call) + expect(res["alerts"]).to_not(contain(have_keys(id=alert_id))) + + with it("retrieves an alert correctly"): + ok, res = self.client.add_runtime_alert( + name="A name", + description="A description", + scope="", + triggers=[SdScanningClient.RuntimeAlertTrigger.unscanned_image] + ) + expect((ok, res)).to(be_successful_api_call) + + alert_id = res["alertId"] + ok, res = self.client.get_alert(alert_id) + expect((ok, res)).to(be_successful_api_call) + expect(res).to(have_keys(name="A name", description="A description", scope="", triggers=have_keys(unscanned=be_true))) + + with it("updates a runtime alert correctly"): + ok, res = self.client.add_runtime_alert( + name="A name", + description="A description", + scope="", + triggers=[SdScanningClient.RuntimeAlertTrigger.unscanned_image] + ) + expect((ok, res)).to(be_successful_api_call) + expect(res).to(have_keys(name="A name", description="A description", scope="", triggers=have_keys(unscanned=be_true))) + + alert_id = res["alertId"] + ok, res = self.client.update_runtime_alert( + id=alert_id, + name="An updated name", + description="An updated description", + scope="agent.id = 'foo'", + triggers=[SdScanningClient.RuntimeAlertTrigger.scan_result_change_fail] + ) + expect((ok, res)).to(be_successful_api_call) + expect(res).to(have_keys(name="An updated name", description="An updated description", scope="agent.id = 'foo'", triggers=have_keys(unscanned=be_false, policy_eval=be_true), onlyPassFail=be_true)) + + with it("updates a repository alert correctly"): + ok, res = self.client.add_repository_alert( + name="A name", + registry="registry", + repository="repository", + tag="latest", + description="A description", + triggers=[SdScanningClient.RepositoryAlertTrigger.new_image_analyzed] + ) + + expect((ok, res)).to(be_successful_api_call) + expect(res).to(have_keys(name="A name", description="A description", + repositories=contain_exactly(have_keys(registry="registry", repository="repository", tag="latest")), + triggers=have_keys(analysis_update=be_true))) + + alert_id = res["alertId"] + ok, res = self.client.update_repository_alert( + id=alert_id, + name="An updated name", + registry="new_registry", + repository="new_repository", + tag="v1", + description="An updated description", + triggers=[SdScanningClient.RepositoryAlertTrigger.scan_result_change_fail] + ) + expect((ok, res)).to(be_successful_api_call) + expect(res).to(have_keys(name="An updated name", + description="An updated description", + repositories=contain_exactly(have_keys(registry="new_registry", repository="new_repository", tag="v1")), + triggers=have_keys(unscanned=be_false, policy_eval=be_true), onlyPassFail=be_true))