-
Notifications
You must be signed in to change notification settings - Fork 78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AAP-13408] Add support for custom actions #532
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -107,3 +107,5 @@ ENV/ | |
|
||
# awx provision | ||
tests/e2e/utils/awx/artifacts | ||
|
||
.DS_Store |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from ansible_rulebook.action.control import Control | ||
from ansible_rulebook.action.helper import Helper | ||
from ansible_rulebook.action.metadata import Metadata | ||
|
||
__all__ = ["Control", "Helper", "Metadata"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,9 @@ | |
|
||
EDA_YAML_EXTENSIONS = [".yml", ".yaml"] | ||
|
||
EDA_ACTION_PATHS = [ | ||
f"{EDA_PATH_PREFIX}/plugins/rule_action", | ||
] | ||
logger = logging.getLogger(__name__) | ||
|
||
|
||
|
@@ -138,6 +141,24 @@ def load_rulebook(collection, rulebook): | |
return yaml.safe_load(f.read()) | ||
|
||
|
||
def has_action(collection, action): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @benthomasson if we can take in an additional parameter additional_dirs, here which is the directory names we can search for the files in the collection location and an additional directory
|
||
return has_object( | ||
collection, | ||
action, | ||
EDA_ACTION_PATHS, | ||
".py", | ||
) | ||
|
||
|
||
def find_action(collection, action): | ||
return find_object( | ||
collection, | ||
action, | ||
EDA_ACTION_PATHS, | ||
".py", | ||
) | ||
|
||
|
||
def has_source(collection, source): | ||
return has_object( | ||
collection, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,8 @@ | |
import asyncio | ||
import gc | ||
import logging | ||
import os | ||
import runpy | ||
import uuid | ||
from pprint import pformat | ||
from types import MappingProxyType | ||
|
@@ -42,6 +44,11 @@ | |
from ansible_rulebook.action.run_workflow_template import RunWorkflowTemplate | ||
from ansible_rulebook.action.set_fact import SetFact | ||
from ansible_rulebook.action.shutdown import Shutdown as ShutdownAction | ||
from ansible_rulebook.collection import ( | ||
find_action, | ||
has_action, | ||
split_collection_name, | ||
) | ||
from ansible_rulebook.conf import settings | ||
from ansible_rulebook.exception import ( | ||
ShutdownException, | ||
|
@@ -89,6 +96,7 @@ def __init__( | |
project_data_file: Optional[str] = None, | ||
parsed_args=None, | ||
broadcast_method=None, | ||
action_directories=None, | ||
): | ||
self.action_loop_task = None | ||
self.event_log = event_log | ||
|
@@ -104,6 +112,14 @@ def __init__( | |
self.broadcast_method = broadcast_method | ||
self.event_counter = 0 | ||
self.display = terminal.Display() | ||
self.action_directories = action_directories | ||
|
||
def find_action(self, action: str): | ||
for action_dir in self.action_directories: | ||
action_plugin_file = os.path.join(action_dir, f"{action}.py") | ||
if os.path.exists(action_plugin_file): | ||
return runpy.run_path(action_plugin_file) | ||
return None | ||
|
||
async def run_ruleset(self): | ||
tasks = [] | ||
|
@@ -352,6 +368,89 @@ def _run_action( | |
task.add_done_callback(self._handle_action_completion) | ||
return task | ||
|
||
def _build_control( | ||
self, | ||
action, | ||
action_args, | ||
rules_engine_result, | ||
variables, | ||
metadata, | ||
inventory, | ||
hosts, | ||
): | ||
if action == "run_job_template" or action == "run_workflow_template": | ||
limit = dpath.get( | ||
action_args, | ||
"job_args.limit", | ||
separator=".", | ||
default=None, | ||
) | ||
if isinstance(limit, list): | ||
hosts = limit | ||
elif isinstance(limit, str): | ||
hosts = [limit] | ||
elif action == "shutdown": | ||
if self.parsed_args and "delay" not in action_args: | ||
action_args["delay"] = self.parsed_args.shutdown_delay | ||
|
||
single_match = None | ||
keys = list(rules_engine_result.data.keys()) | ||
if len(keys) == 0: | ||
single_match = {} | ||
elif len(keys) == 1 and keys[0] == "m": | ||
single_match = rules_engine_result.data[keys[0]] | ||
else: | ||
multi_match = rules_engine_result.data | ||
variables_copy = variables.copy() | ||
if single_match is not None: | ||
variables_copy["event"] = single_match | ||
event = single_match | ||
if "meta" in event: | ||
if "hosts" in event["meta"]: | ||
hosts = parse_hosts(event["meta"]["hosts"]) | ||
else: | ||
variables_copy["events"] = multi_match | ||
new_hosts = [] | ||
for event in variables_copy["events"].values(): | ||
if "meta" in event: | ||
if "hosts" in event["meta"]: | ||
new_hosts.extend(parse_hosts(event["meta"]["hosts"])) | ||
if new_hosts: | ||
hosts = new_hosts | ||
|
||
if "var_root" in action_args: | ||
var_root = action_args.pop("var_root") | ||
logger.debug( | ||
"Update variables [%s] with new root [%s]", | ||
variables_copy, | ||
var_root, | ||
) | ||
_update_variables(variables_copy, var_root) | ||
|
||
logger.debug( | ||
"substitute_variables [%s] [%s]", | ||
action_args, | ||
variables_copy, | ||
) | ||
action_args = { | ||
k: substitute_variables(v, variables_copy) | ||
for k, v in action_args.items() | ||
} | ||
logger.debug("action args: %s", action_args) | ||
|
||
if "ruleset" not in action_args: | ||
action_args["ruleset"] = metadata.rule_set | ||
|
||
control = Control( | ||
queue=self.event_log, | ||
inventory=inventory, | ||
hosts=hosts, | ||
variables=variables_copy, | ||
project_data_file=self.project_data_file, | ||
) | ||
|
||
return control, action_args, variables_copy | ||
|
||
async def _call_action( | ||
self, | ||
metadata: Metadata, | ||
|
@@ -368,80 +467,15 @@ async def _call_action( | |
error = None | ||
if action in ACTION_CLASSES: | ||
try: | ||
if ( | ||
action == "run_job_template" | ||
or action == "run_workflow_template" | ||
): | ||
limit = dpath.get( | ||
action_args, | ||
"job_args.limit", | ||
separator=".", | ||
default=None, | ||
) | ||
if isinstance(limit, list): | ||
hosts = limit | ||
elif isinstance(limit, str): | ||
hosts = [limit] | ||
elif action == "shutdown": | ||
if self.parsed_args and "delay" not in action_args: | ||
action_args["delay"] = self.parsed_args.shutdown_delay | ||
|
||
single_match = None | ||
keys = list(rules_engine_result.data.keys()) | ||
if len(keys) == 0: | ||
single_match = {} | ||
elif len(keys) == 1 and keys[0] == "m": | ||
single_match = rules_engine_result.data[keys[0]] | ||
else: | ||
multi_match = rules_engine_result.data | ||
variables_copy = variables.copy() | ||
if single_match is not None: | ||
variables_copy["event"] = single_match | ||
event = single_match | ||
if "meta" in event: | ||
if "hosts" in event["meta"]: | ||
hosts = parse_hosts(event["meta"]["hosts"]) | ||
else: | ||
variables_copy["events"] = multi_match | ||
new_hosts = [] | ||
for event in variables_copy["events"].values(): | ||
if "meta" in event: | ||
if "hosts" in event["meta"]: | ||
new_hosts.extend( | ||
parse_hosts(event["meta"]["hosts"]) | ||
) | ||
if new_hosts: | ||
hosts = new_hosts | ||
|
||
if "var_root" in action_args: | ||
var_root = action_args.pop("var_root") | ||
logger.debug( | ||
"Update variables [%s] with new root [%s]", | ||
variables_copy, | ||
var_root, | ||
) | ||
_update_variables(variables_copy, var_root) | ||
|
||
logger.debug( | ||
"substitute_variables [%s] [%s]", | ||
control, action_args, variables_copy = self._build_control( | ||
action, | ||
action_args, | ||
variables_copy, | ||
) | ||
action_args = { | ||
k: substitute_variables(v, variables_copy) | ||
for k, v in action_args.items() | ||
} | ||
logger.debug("action args: %s", action_args) | ||
|
||
if "ruleset" not in action_args: | ||
action_args["ruleset"] = metadata.rule_set | ||
|
||
control = Control( | ||
queue=self.event_log, | ||
inventory=inventory, | ||
hosts=hosts, | ||
variables=variables_copy, | ||
project_data_file=self.project_data_file, | ||
rules_engine_result, | ||
variables, | ||
metadata, | ||
inventory, | ||
hosts, | ||
) | ||
|
||
await ACTION_CLASSES[action]( | ||
|
@@ -481,6 +515,43 @@ async def _call_action( | |
except BaseException as e: | ||
logger.error(e) | ||
raise | ||
elif action_plugin := self.find_action(action): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @benthomasson are we going to have special names for the custom actions so they don't conflict with our builtin action names. If the name conflict exists we will only run the builtin and not run the custom actions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @benthomasson Since the code is the same for the 2 find_actions call can we move it into a separate function
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @benthomasson doing this search with every action call might not make sense, we could load all the actions in when we read the ruleset. Which also allows us to ensure that all the actions are valid before we start accepting the events and running the rule engine. if the actions are missing we can do an early termination to indicate that actions are missing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We should support FQCN for actions. Names without collections prefixes are either builtin or loaded from a local directory. The precedence should be builtin and then local directory for non-FQCN. |
||
try: | ||
control, action_args, variables_copy = self._build_control( | ||
action, | ||
action_args, | ||
rules_engine_result, | ||
variables, | ||
metadata, | ||
inventory, | ||
hosts, | ||
) | ||
await action_plugin["main"](metadata, control, **action_args) | ||
except Exception as e: | ||
logger.error("Error calling action %s, err %s", action, str(e)) | ||
error = e | ||
except BaseException as e: | ||
logger.error(e) | ||
raise | ||
elif has_action(*split_collection_name(action)): | ||
action_plugin = find_action(*split_collection_name(action)) | ||
try: | ||
control, action_args, variables_copy = self._build_control( | ||
action, | ||
action_args, | ||
rules_engine_result, | ||
variables, | ||
metadata, | ||
inventory, | ||
hosts, | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @benthomasson do we have to send the result of the action_plugin to the AAP server, after it has finished execution. Will there be a post_event/set_fact at the end of this action. Would it make sense to extract out some of the logic from builtin which does the reporting for us. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We probably should send the result. Moving the reporting code out of builtin into a different module is a good idea. |
||
await action_plugin.main(metadata, control, **action_args) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @benthomasson Should we pass the metadata, control which are data classes as dictionaries into the action plugin so they can be tested outside of ansible rulebook otherwise they would have to know about the Metadata,Control data class definitions and if we change it it will effect the plugin. Also for the feedback should we be passing in a function that can they call to send the feedback to the eda-server. We should have a very loose coupling between action plugin and ansible-rulebook so each can be tested separately. |
||
except Exception as e: | ||
logger.error("Error calling action %s, err %s", action, str(e)) | ||
error = e | ||
except BaseException as e: | ||
logger.error(e) | ||
raise | ||
else: | ||
logger.error("Action %s not supported", action) | ||
error = UnsupportedActionException( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@benthomasson Can these go into the settings and it can be accessed from anywhere. We have been moving most of the passed in args into the settings.