From 818f7d2721751f25eec4bdb3feadb3029479da9a Mon Sep 17 00:00:00 2001 From: Fede Barcelona Date: Wed, 17 Mar 2021 15:24:45 +0100 Subject: [PATCH] feat: Add policy types support (#186) * feat: Add policy types support * ci: Ignore assert order --- sdcclient/_secure.py | 280 +--------------------- sdcclient/secure/__init__.py | 5 +- sdcclient/secure/_policy_v2.py | 351 ++++++++++++++++++++++++++++ specs/monitor/dashboards_v3_spec.py | 4 +- specs/secure/policy_v2_spec.py | 23 +- 5 files changed, 370 insertions(+), 293 deletions(-) create mode 100644 sdcclient/secure/_policy_v2.py diff --git a/sdcclient/_secure.py b/sdcclient/_secure.py index e8262d0c..49468e19 100644 --- a/sdcclient/_secure.py +++ b/sdcclient/_secure.py @@ -2,12 +2,13 @@ import time from sdcclient._common import _SdcCommon -from sdcclient.secure import FalcoRulesFilesClientOld, PolicyEventsClientV1, PolicyEventsClientOld +from sdcclient.secure import FalcoRulesFilesClientOld, PolicyEventsClientV1, PolicyEventsClientOld, PolicyClientV2 class SdSecureClient(FalcoRulesFilesClientOld, PolicyEventsClientV1, PolicyEventsClientOld, + PolicyClientV2, _SdcCommon): def __init__(self, token="", sdc_url='https://secure.sysdig.com', ssl_verify=True, custom_headers=None): super(SdSecureClient, self).__init__(token, sdc_url, ssl_verify, custom_headers) @@ -25,283 +26,6 @@ def policy_v2(self): self._policy_v2 = res.status_code != 404 return self._policy_v2 - def create_default_policies(self): - '''**Description** - Create new policies based on the currently available set of rules. For now, this only covers Falco rules, but we might extend - the endpoint later. The backend should use the defaultPolicies property of a previously provided FalcoRulesFiles model as - guidance on the set of policies to create. The backend should only create new policies (not delete or modify), and should only - create new policies if there is not an existing policy with the same name. - - **Arguments** - - None - - **Success Return Value** - JSON containing details on any new policies that were added. - - **Example** - `examples/create_default_policies.py `_ - - ''' - res = self.http.post(self.url + '/api/v2/policies/default', headers=self.hdrs, verify=self.ssl_verify) - return self._request_result(res) - - def delete_all_policies(self): - '''**Description** - Delete all existing policies. The falco rules file is unchanged. - - **Arguments** - - None - - **Success Return Value** - The string "Policies Deleted" - - **Example** - `examples/delete_all_policies.py `_ - - ''' - ok, res = self.list_policies() - if not ok: - return False, res - - for policy in res: - ok, res = self.delete_policy_id(policy["id"]) - if not ok: - return False, res - - return True, "Policies Deleted" - - def list_policies(self): - '''**Description** - List the current set of policies. - - **Arguments** - - None - - **Success Return Value** - A JSON object containing the number and details of each policy. - - **Example** - `examples/list_policies.py `_ - - ''' - res = self.http.get(self.url + '/api/v2/policies', headers=self.hdrs, verify=self.ssl_verify) - return self._request_result(res) - - def get_policy(self, name): - '''**Description** - Find the policy with name and return its json description. - - **Arguments** - - name: the name of the policy to fetch - - **Success Return Value** - A JSON object containing the description of the policy. If there is no policy with - the given name, returns False. - - **Example** - `examples/get_policy.py `_ - - ''' - ok, res = self.list_policies() - if not ok: - return [False, res] - policies = res - - # Find the policy with the given name and return it. - for policy in policies: - if policy["name"] == name: - return [True, policy] - - return [False, "No policy with name {}".format(name)] - - def get_policy_id(self, id): - '''**Description** - Find the policy with id and return its json description. - - **Arguments** - - id: the id of the policy to fetch - - **Success Return Value** - A JSON object containing the description of the policy. If there is no policy with - the given name, returns False. - ''' - res = self.http.get(self.url + '/api/v2/policies/{}'.format(id), headers=self.hdrs, verify=self.ssl_verify) - return self._request_result(res) - - def add_policy(self, name, description, rule_names=[], actions=[], scope=None, severity=0, enabled=True, - notification_channels=[]): - '''**Description** - Add a new policy. - - **Arguments** - - name: A short name for the policy - - description: Description of policy - - rule_names: Array of rule names. (They must be names instead of ids, as the rules list view is by name, to account for multiple rules having the same name). - - actions: It can be a stop, pause and/or capture action - - scope: Where the policy is being applied- Container, Host etc.. (example: "container.image.repository = sysdig/agent") - - enabled: True if the policy should be considered - - severity: How severe is this policy when violated. Range from 0 to 7 included. - - notification_channels: ids of the notification channels to subscribe to the policy - - **Success Return Value** - The string "OK" - ''' - policy = { - "name": name, - "description": description, - "ruleNames": rule_names, - "actions": actions, - "scope": scope, - "severity": severity, - "enabled": enabled, - "notificationChannelIds": notification_channels - } - res = self.http.post(self.url + '/api/v2/policies', headers=self.hdrs, data=json.dumps(policy), - verify=self.ssl_verify) - return self._request_result(res) - - def add_policy_json(self, policy_json): - '''**Description** - Add a new policy using the provided json. - - **Arguments** - - policy_json: a description of the new policy - - **Success Return Value** - The string "OK" - - **Example** - `examples/add_policy.py `_ - - ''' - - try: - policy_obj = json.loads(policy_json) - if "origin" in policy_obj: - del policy_obj["origin"] - except Exception as e: - return [False, "policy json is not valid json: {}".format(str(e))] - - res = self.http.post(self.url + '/api/v2/policies', headers=self.hdrs, data=json.dumps(policy_obj), - verify=self.ssl_verify) - return self._request_result(res) - - def update_policy(self, id, name=None, description=None, rule_names=None, actions=None, scope=None, - severity=None, enabled=None, notification_channels=None): - '''**Description** - Update policy with the provided values. - - **Arguments** - - id: the id of the policy to update - - name: A short name for the policy - - description: Description of policy - - rule_names: Array of rule names. (They must be names instead of ids, as the rules list view is by name, to account for multiple rules having the same name). - - actions: It can be a stop, pause and/or capture action - - scope: Where the policy is being applied- Container, Host etc.. (example: "container.image.repository = sysdig/agent") - - enabled: True if the policy should be considered - - severity: How severe is this policy when violated. Range from 0 to 7 included. - - notification_channels: ids of the notification channels to subscribe to the policy - - **Success Return Value** - The string "OK" - ''' - ok, res = self.get_policy_id(id) - if not ok: - return [False, res] - policy = res - - if name is not None: - policy["name"] = name - if description is not None: - policy["description"] = description - if rule_names is not None: - policy["ruleNames"] = rule_names - if actions is not None: - policy["actions"] = actions - if scope is not None: - policy["scope"] = scope - if severity is not None: - policy["severity"] = severity - if enabled is not None: - policy["enabled"] = enabled - if notification_channels is not None: - policy["notificationChannelIds"] = notification_channels - - res = self.http.put(self.url + '/api/v2/policies/{}'.format(id), headers=self.hdrs, data=json.dumps(policy), - verify=self.ssl_verify) - return self._request_result(res) - - def update_policy_json(self, policy_json): - '''**Description** - Update an existing policy using the provided json. The 'id' field from the policy is - used to determine which policy to update. - - **Arguments** - - policy_json: a description of the new policy - - **Success Return Value** - The string "OK" - - **Example** - `examples/update_policy.py `_ - - ''' - try: - policy_obj = json.loads(policy_json) - if "origin" in policy_obj: - del policy_obj["origin"] - except Exception as e: - return [False, "policy json is not valid json: {}".format(str(e))] - - if "id" not in policy_obj: - return [False, "Policy Json does not have an 'id' field"] - - res = self.http.put(self.url + '/api/v2/policies/{}'.format(policy_obj["id"]), headers=self.hdrs, - data=json.dumps(policy_obj), verify=self.ssl_verify) - return self._request_result(res) - - def delete_policy_name(self, name): - '''**Description** - Delete the policy with the given name. - - **Arguments** - - name: the name of the policy to delete - - **Success Return Value** - The JSON object representing the now-deleted policy. - - **Example** - `examples/delete_policy.py `_ - - ''' - ok, res = self.list_policies() - if not ok: - return [False, res] - - # Find the policy with the given name and delete it - for policy in res: - if policy["name"] == name: - return self.delete_policy_id(policy["id"]) - - return [False, "No policy with name {}".format(name)] - - def delete_policy_id(self, id): - '''**Description** - Delete the policy with the given id - - **Arguments** - - id: the id of the policy to delete - - **Success Return Value** - The JSON object representing the now-deleted policy. - - **Example** - `examples/delete_policy.py `_ - - ''' - res = self.http.delete(self.url + '/api/v2/policies/{}'.format(id), headers=self.hdrs, verify=self.ssl_verify) - return self._request_result(res) - def list_rules(self): '''**Description** Returns the list of rules in the system. These are grouped by name diff --git a/sdcclient/secure/__init__.py b/sdcclient/secure/__init__.py index a0dbb464..2af04a75 100644 --- a/sdcclient/secure/__init__.py +++ b/sdcclient/secure/__init__.py @@ -1,5 +1,8 @@ from ._falco_rules_files_old import FalcoRulesFilesClientOld from ._policy_events_old import PolicyEventsClientOld from ._policy_events_v1 import PolicyEventsClientV1 +from ._policy_v2 import PolicyClientV2, policy_action_pause, policy_action_stop, policy_action_kill, \ + policy_action_capture -__all__ = ["PolicyEventsClientOld", "PolicyEventsClientV1", "FalcoRulesFilesClientOld"] +__all__ = ["PolicyEventsClientOld", "PolicyEventsClientV1", "FalcoRulesFilesClientOld", + "PolicyClientV2", "policy_action_pause", "policy_action_stop", "policy_action_kill", "policy_action_capture"] diff --git a/sdcclient/secure/_policy_v2.py b/sdcclient/secure/_policy_v2.py new file mode 100644 index 00000000..ff88fade --- /dev/null +++ b/sdcclient/secure/_policy_v2.py @@ -0,0 +1,351 @@ +import json + +from sdcclient._common import _SdcCommon + + +def policy_action_stop(): + return { + "type": "POLICY_ACTION_STOP", + } + + +def policy_action_capture(file_name, secs_before=5, secs_after=15, filter=""): + return { + "afterEventNs": secs_after * 1_000_000_000, + "beforeEventNs": secs_before * 1_000_000_000, + "isLimitedToContainer": False, + "type": "POLICY_ACTION_CAPTURE", + "filter": filter, + "name": file_name, + "bucketName": "", + "storageType": "S3" + } + + +def policy_action_pause(): + return { + "type": "POLICY_ACTION_PAUSE", + } + + +def policy_action_kill(): + return { + "type": "POLICY_ACTION_KILL", + } + + +class PolicyClientV2(_SdcCommon): + def __init__(self, token="", sdc_url='https://secure.sysdig.com', ssl_verify=True, custom_headers=None): + super(PolicyClientV2, self).__init__(token, sdc_url, ssl_verify, custom_headers) + self.product = "SDS" + + def create_default_policies(self): + """ + Create new policies based on the currently available set of rules. For now, this only covers Falco rules, but we might extend + the endpoint later. The backend should use the defaultPolicies property of a previously provided FalcoRulesFiles model as + guidance on the set of policies to create. The backend should only create new policies (not delete or modify), and should only + create new policies if there is not an existing policy with the same name. + + Returns: A touple (bool, res/err) where the first element indicates if the API call was successful and the second the error or the result. + + Examples: + >>> from sdcclient.secure import PolicyClientV2 + >>> client = PolicyClientV2(sdc_url="https://secure.sysdig.com", token=SECURE_TOKEN) + >>> ok, res = client.create_default_policies() + """ + + res = self.http.post(self.url + '/api/v2/policies/default', headers=self.hdrs, verify=self.ssl_verify) + return self._request_result(res) + + def delete_all_policies(self): + """ + Delete all existing policies. + + Returns: A touple (bool, res/err) where the first element indicates if the API call was successful and the second the error or the result. + + Examples: + >>> from sdcclient.secure import PolicyClientV2 + >>> client = PolicyClientV2(sdc_url="https://secure.sysdig.com", token=SECURE_TOKEN) + >>> ok, res = client.delete_all_policies() + """ + + ok, res = self.list_policies() + if not ok: + return False, res + + for policy in res: + ok, res = self.delete_policy_id(policy["id"]) + if not ok: + return False, res + + return True, "Policies Deleted" + + def list_policies(self): + """ + List the current set of policies. + + Returns: A touple (bool, res/err) where the first element indicates if the API call was successful and the second the error or the result. + + Examples: + >>> from sdcclient.secure import PolicyClientV2 + >>> client = PolicyClientV2(sdc_url="https://secure.sysdig.com", token=SECURE_TOKEN) + >>> ok, res = client.list_policies() + """ + + res = self.http.get(self.url + '/api/v2/policies', headers=self.hdrs, verify=self.ssl_verify) + return self._request_result(res) + + def get_policy(self, name): + """ + Find the policy with name and return its json description. + + Args: + name(str): The name of the policy to fetch + + Returns: A touple (bool, res/err) where the first element indicates if the API call was successful and the + second the error or the JSON object containing the policy. + + Examples: + >>> import json + >>> from sdcclient.secure import PolicyClientV2 + >>> client = PolicyClientV2(sdc_url="https://secure.sysdig.com", token=SECURE_TOKEN) + >>> ok, res = client.get_policy(name="Terminal shell in container") + >>> if ok: + >>> print((json.dumps(res, indent=2))) + """ + + ok, res = self.list_policies() + if not ok: + return [False, res] + policies = res + + # Find the policy with the given name and return it. + for policy in policies: + if policy["name"] == name: + return [True, policy] + + return [False, "No policy with name {}".format(name)] + + def get_policy_id(self, id): + """ + Find the policy with id and return its json description. + + Args: + id(int): The id of the policy to fetch + + Returns: A touple (bool, res/err) where the first element indicates if the API call was successful and the + second the error or the JSON object containing the policy. + + Examples: + >>> import json + >>> from sdcclient.secure import PolicyClientV2 + >>> client = PolicyClientV2(sdc_url="https://secure.sysdig.com", token=SECURE_TOKEN) + >>> ok, res = client.get_policy_id(id=123456) + >>> if ok: + >>> print((json.dumps(res, indent=2))) + """ + + res = self.http.get(self.url + '/api/v2/policies/{}'.format(id), headers=self.hdrs, verify=self.ssl_verify) + return self._request_result(res) + + def add_policy(self, name, description, rule_names=[], actions=[], scope=None, severity=0, enabled=True, + notification_channels=[], type="falco"): + """ + Adds a new policy. + + Args: + name(str): A short name for the policy + description(str): Description of policy + rule_names(list): Array of rule names. (They must be names instead of ids, as the rules list view is by name, to account for multiple rules having the same name). + actions(list): It can be a `policy_action_stop()`, `policy_action_pause()`, `policy_action_capture()` or `policy_action_kill()` action + scope(str): Where the policy is being applied- Container, Host etc.. (example: "container.image.repository = sysdig/agent") + severity(int): True if the policy should be considered + enabled(bool): How severe is this policy when violated. Range from 0 to 7 included. + notification_channels(list): ids of the notification channels to subscribe to the policy + type(str): Type of the Policy. It can be one of: `falco`, `list_matching`, `k8s_audit`. + + Returns: A touple (bool, res/err) where the first element indicates if the API call was successful and the second the error or the result. + + Examples: + >>> from sdcclient.secure import PolicyClientV2, policy_action_stop + >>> client = PolicyClientV2(sdc_url="https://secure.sysdig.com", token=SECURE_TOKEN) + >>> ok, res = client.add_policy(name="Terminal shell in container", + description="A shell was spawned by a program in a container with an attached terminal.", + rule_names=["Terminal shell in container"], + actions=[policy_action_stop()], + type="falco") + """ + + policy = { + "name": name, + "description": description, + "ruleNames": rule_names, + "actions": actions, + "scope": scope, + "severity": severity, + "enabled": enabled, + "notificationChannelIds": notification_channels, + "type": type, + } + + res = self.http.post(self.url + '/api/v2/policies', headers=self.hdrs, data=json.dumps(policy), + verify=self.ssl_verify) + return self._request_result(res) + + def add_policy_json(self, policy_json): + """ + Add a new policy using the provided json. + + Args: + policy_json: a description of the new policy + + Returns: A touple (bool, res/err) where the first element indicates if the API call was successful and the second the error or the result. + + Examples: + >>> import sys + >>> from sdcclient.secure import PolicyClientV2 + >>> client = PolicyClientV2(sdc_url="https://secure.sysdig.com", token=SECURE_TOKEN) + >>> policy_json = sys.stdin.read() + >>> ok, res = client.add_policy_json(policy_json) + + """ + + try: + policy_obj = json.loads(policy_json) + if "origin" in policy_obj: + del policy_obj["origin"] + except Exception as e: + return [False, "policy json is not valid json: {}".format(str(e))] + + res = self.http.post(self.url + '/api/v2/policies', headers=self.hdrs, data=json.dumps(policy_obj), + verify=self.ssl_verify) + return self._request_result(res) + + def update_policy(self, id, name=None, description=None, rule_names=None, actions=None, scope=None, + severity=None, enabled=None, notification_channels=None): + """ + Update policy with the provided values. Only the defined values will be updated. + + Args: + id(int): The id of the policy to update + name(str): A short name for the policy + description(str): Description of policy + rule_names(list): Array of rule names. (They must be names instead of ids, as the rules list view is by name, to account for multiple rules having the same name). + actions(list): It can be a `policy_action_stop()`, `policy_action_pause()`, `policy_action_capture()` or `policy_action_kill()` action + scope(str): Where the policy is being applied- Container, Host etc.. (example: "container.image.repository = sysdig/agent") + severity(int): True if the policy should be considered + enabled(bool): How severe is this policy when violated. Range from 0 to 7 included. + notification_channels(list): ids of the notification channels to subscribe to the policy + + Returns: A touple (bool, res/err) where the first element indicates if the API call was successful and the second the error or the result. + + Examples: + >>> from sdcclient.secure import PolicyClientV2, policy_action_stop + >>> client = PolicyClientV2(sdc_url="https://secure.sysdig.com", token=SECURE_TOKEN) + >>> ok, res = client.update_policy(name="Terminal shell in container", + description="A shell was spawned by a program in a container with an attached terminal.", + rule_names=["Terminal shell in container"], + actions=[policy_action_stop()]) + """ + + ok, res = self.get_policy_id(id) + if not ok: + return [False, res] + policy = res + + if name is not None: + policy["name"] = name + if description is not None: + policy["description"] = description + if rule_names is not None: + policy["ruleNames"] = rule_names + if actions is not None: + policy["actions"] = actions + if scope is not None: + policy["scope"] = scope + if severity is not None: + policy["severity"] = severity + if enabled is not None: + policy["enabled"] = enabled + if notification_channels is not None: + policy["notificationChannelIds"] = notification_channels + + res = self.http.put(self.url + '/api/v2/policies/{}'.format(id), headers=self.hdrs, data=json.dumps(policy), + verify=self.ssl_verify) + return self._request_result(res) + + def update_policy_json(self, policy_json): + """ + Update an existing policy using the provided json. The 'id' field from the policy is + used to determine which policy to update. + + Args: + policy_json(str): A description of the new policy + + Returns: A touple (bool, res/err) where the first element indicates if the API call was successful and the second the error or the result. + + Examples: + >>> import sys + >>> from sdcclient.secure import PolicyClientV2 + >>> client = PolicyClientV2(sdc_url="https://secure.sysdig.com", token=SECURE_TOKEN) + >>> policy_json = sys.stdin.read() + >>> ok, res = client.update_policy_json(policy_json) + """ + + try: + policy_obj = json.loads(policy_json) + if "origin" in policy_obj: + del policy_obj["origin"] + except Exception as e: + return [False, "policy json is not valid json: {}".format(str(e))] + + if "id" not in policy_obj: + return [False, "Policy Json does not have an 'id' field"] + + res = self.http.put(self.url + '/api/v2/policies/{}'.format(policy_obj["id"]), headers=self.hdrs, + data=json.dumps(policy_obj), verify=self.ssl_verify) + return self._request_result(res) + + def delete_policy_name(self, name): + """ + Delete the policy with the given name. + + Args: + name(str): The name of the policy to delete + + Returns: A touple (bool, res/err) where the first element indicates if the API call was successful and the second the error or the JSON object representing the now-deleted policy. + + Examples: + >>> from sdcclient.secure import PolicyClientV2 + >>> client = PolicyClientV2(sdc_url="https://secure.sysdig.com", token=SECURE_TOKEN) + >>> ok, res = client.delete_policy_name(name="Terminal shell in container") + """ + + ok, res = self.list_policies() + if not ok: + return [False, res] + + # Find the policy with the given name and delete it + for policy in res: + if policy["name"] == name: + return self.delete_policy_id(policy["id"]) + + return [False, "No policy with name {}".format(name)] + + def delete_policy_id(self, id): + """ + Delete the policy with the given name. + + Args: + id(int): The id of the policy to delete + + Returns: A touple (bool, res/err) where the first element indicates if the API call was successful and the second the error or the JSON object representing the now-deleted policy. + + Examples: + >>> from sdcclient.secure import PolicyClientV2 + >>> client = PolicyClientV2(sdc_url="https://secure.sysdig.com", token=SECURE_TOKEN) + >>> ok, res = client.delete_policy_id(id=123456) + """ + + res = self.http.delete(self.url + '/api/v2/policies/{}'.format(id), headers=self.hdrs, verify=self.ssl_verify) + return self._request_result(res) diff --git a/specs/monitor/dashboards_v3_spec.py b/specs/monitor/dashboards_v3_spec.py index 31f6dc4a..1c2a574d 100644 --- a/specs/monitor/dashboards_v3_spec.py +++ b/specs/monitor/dashboards_v3_spec.py @@ -267,5 +267,5 @@ def create_test_dashboard(self): expect(res_team2["dashboard"]).to(have_key("shared", True)) expect(res_team2["dashboard"]).to(have_key("sharingSettings")) expect(res_team2["dashboard"]["sharingSettings"]).to(have_len(2)) - expect(res_team2["dashboard"]["sharingSettings"][0]["role"]).to(equal("ROLE_RESOURCE_READ")) - expect(res_team2["dashboard"]["sharingSettings"][1]["role"]).to(equal("ROLE_RESOURCE_EDIT")) + expect(res_team2["dashboard"]["sharingSettings"]).to(contain(have_keys(role=equal("ROLE_RESOURCE_READ")))) + expect(res_team2["dashboard"]["sharingSettings"]).to(contain(have_keys(role=equal("ROLE_RESOURCE_EDIT")))) diff --git a/specs/secure/policy_v2_spec.py b/specs/secure/policy_v2_spec.py index d9385534..4cdc9b83 100644 --- a/specs/secure/policy_v2_spec.py +++ b/specs/secure/policy_v2_spec.py @@ -2,24 +2,16 @@ import os from expects import expect -from mamba import before, description, after, it +from mamba import before, after, it, description from sdcclient import SdSecureClient +from sdcclient.secure import policy_action_capture from specs import be_successful_api_call _POLICY_NAME = "Test - Terminal shell in container" _POLICY_DESCRIPTION = "A shell was spawned by a program in a container with an attached terminal." _POLICY_RULES = ["Terminal shell in container"] -_POLICY_ACTIONS = [{ - "type": "POLICY_ACTION_CAPTURE", - "name": "Terminal shell in container", - "filter": "", - "storageType": "S3", - "bucketName": "", - "isLimitedToContainer": False, - "beforeEventNs": 10000000000, - "afterEventNs": 20000000000 -}] +_POLICY_ACTIONS = [policy_action_capture(file_name="TerminalShellInContainer", secs_before=10, secs_after=20)] def policy_json(): @@ -44,9 +36,14 @@ def policy_json(): with before.all: self.client = SdSecureClient(sdc_url=os.getenv("SDC_SECURE_URL", "https://secure.sysdig.com"), token=os.getenv("SDC_SECURE_TOKEN")) + + with before.each: + self.cleanup_policies() + with after.each: self.cleanup_policies() + def cleanup_policies(self): _, res = self.client.list_policies() for policy in res: @@ -54,6 +51,7 @@ def cleanup_policies(self): ok, res = self.client.delete_policy_id(policy["id"]) expect((ok, res)).to(be_successful_api_call) + with it("is able to list all existing policies"): ok, res = self.client.list_policies() expect((ok, res)).to(be_successful_api_call) @@ -66,7 +64,8 @@ def cleanup_policies(self): ok, res = self.client.add_policy(name=_POLICY_NAME, description=_POLICY_DESCRIPTION, rule_names=_POLICY_RULES, - actions=_POLICY_ACTIONS) + actions=_POLICY_ACTIONS, + type="falco") expect((ok, res)).to(be_successful_api_call)