Skip to content

Commit

Permalink
Refactor Action message
Browse files Browse the repository at this point in the history
  • Loading branch information
benthomasson committed Aug 9, 2023
1 parent a90363d commit 9143c39
Show file tree
Hide file tree
Showing 2 changed files with 199 additions and 150 deletions.
318 changes: 169 additions & 149 deletions ansible_rulebook/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
WorkflowJobTemplateNotFoundException,
)
from .job_template_runner import job_template_runner
from .messages import Shutdown
from .messages import Shutdown, Action, serialize
from .util import get_horizontal_rule, run_at

logger = logging.getLogger(__name__)
Expand All @@ -70,19 +70,21 @@ async def none(
ruleset: str,
):
await event_log.put(
dict(
type="Action",
action="noop",
action_uuid=str(uuid.uuid4()),
ruleset=source_ruleset_name,
ruleset_uuid=source_ruleset_uuid,
rule=source_rule_name,
rule_uuid=source_rule_uuid,
activation_id=settings.identifier,
run_at=run_at(),
status=INTERNAL_ACTION_STATUS,
matching_events=_get_events(variables),
rule_run_at=rule_run_at,
serialize(
Action(
type="Action",
action="noop",
action_uuid=str(uuid.uuid4()),
ruleset=source_ruleset_name,
ruleset_uuid=source_ruleset_uuid,
rule=source_rule_name,
rule_uuid=source_rule_uuid,
activation_id=settings.identifier,
run_at=run_at(),
status=INTERNAL_ACTION_STATUS,
matching_events=_get_events(variables),
rule_run_at=rule_run_at,
)
)
)

Expand Down Expand Up @@ -111,20 +113,22 @@ async def debug(event_log, **kwargs):
print(get_horizontal_rule("="))
sys.stdout.flush()
await event_log.put(
dict(
type="Action",
action="debug",
action_uuid=str(uuid.uuid4()),
playbook_name=kwargs.get("name"),
ruleset=kwargs.get("source_ruleset_name"),
ruleset_uuid=kwargs.get("source_ruleset_uuid"),
rule=kwargs.get("source_rule_name"),
rule_uuid=kwargs.get("source_rule_uuid"),
rule_run_at=kwargs.get("rule_run_at"),
activation_id=settings.identifier,
run_at=run_at(),
status=INTERNAL_ACTION_STATUS,
matching_events=_get_events(kwargs.get("variables")),
serialize(
Action(
type="Action",
action="debug",
action_uuid=str(uuid.uuid4()),
playbook_name=kwargs.get("name"),
ruleset=kwargs.get("source_ruleset_name"),
ruleset_uuid=kwargs.get("source_ruleset_uuid"),
rule=kwargs.get("source_rule_name"),
rule_uuid=kwargs.get("source_rule_uuid"),
rule_run_at=kwargs.get("rule_run_at"),
activation_id=settings.identifier,
run_at=run_at(),
status=INTERNAL_ACTION_STATUS,
matching_events=_get_events(kwargs.get("variables")),
)
)
)

Expand Down Expand Up @@ -153,20 +157,22 @@ async def print_event(
print_fn(variables[var_name])
sys.stdout.flush()
await event_log.put(
dict(
type="Action",
action="print_event",
action_uuid=str(uuid.uuid4()),
activation_id=settings.identifier,
ruleset=source_ruleset_name,
ruleset_uuid=source_ruleset_uuid,
rule=source_rule_name,
rule_uuid=source_rule_uuid,
playbook_name=name,
run_at=run_at(),
status=INTERNAL_ACTION_STATUS,
matching_events=_get_events(variables),
rule_run_at=rule_run_at,
serialize(
Action(
type="Action",
action="print_event",
action_uuid=str(uuid.uuid4()),
activation_id=settings.identifier,
ruleset=source_ruleset_name,
ruleset_uuid=source_ruleset_uuid,
rule=source_rule_name,
rule_uuid=source_rule_uuid,
playbook_name=name,
run_at=run_at(),
status=INTERNAL_ACTION_STATUS,
matching_events=_get_events(variables),
rule_run_at=rule_run_at,
)
)
)

Expand All @@ -189,20 +195,22 @@ async def set_fact(
logger.debug("set_fact %s %s", ruleset, fact)
lang.assert_fact(ruleset, _embellish_internal_event(fact, "set_fact"))
await event_log.put(
dict(
type="Action",
action="set_fact",
action_uuid=str(uuid.uuid4()),
activation_id=settings.identifier,
ruleset=source_ruleset_name,
ruleset_uuid=source_ruleset_uuid,
rule=source_rule_name,
rule_uuid=source_rule_uuid,
playbook_name=name,
run_at=run_at(),
status=INTERNAL_ACTION_STATUS,
matching_events=_get_events(variables),
rule_run_at=rule_run_at,
serialize(
Action(
type="Action",
action="set_fact",
action_uuid=str(uuid.uuid4()),
activation_id=settings.identifier,
ruleset=source_ruleset_name,
ruleset_uuid=source_ruleset_uuid,
rule=source_rule_name,
rule_uuid=source_rule_uuid,
playbook_name=name,
run_at=run_at(),
status=INTERNAL_ACTION_STATUS,
matching_events=_get_events(variables),
rule_run_at=rule_run_at,
)
)
)

Expand Down Expand Up @@ -231,20 +239,22 @@ async def retract_fact(

lang.retract_matching_facts(ruleset, fact, partial, exclude_keys)
await event_log.put(
dict(
type="Action",
action="retract_fact",
action_uuid=str(uuid.uuid4()),
ruleset=source_ruleset_name,
ruleset_uuid=source_ruleset_uuid,
rule=source_rule_name,
rule_uuid=source_rule_uuid,
activation_id=settings.identifier,
playbook_name=name,
run_at=run_at(),
status=INTERNAL_ACTION_STATUS,
matching_events=_get_events(variables),
rule_run_at=rule_run_at,
serialize(
Action(
type="Action",
action="retract_fact",
action_uuid=str(uuid.uuid4()),
ruleset=source_ruleset_name,
ruleset_uuid=source_ruleset_uuid,
rule=source_rule_name,
rule_uuid=source_rule_uuid,
activation_id=settings.identifier,
playbook_name=name,
run_at=run_at(),
status=INTERNAL_ACTION_STATUS,
matching_events=_get_events(variables),
rule_run_at=rule_run_at,
)
)
)

Expand All @@ -266,19 +276,21 @@ async def post_event(
lang.post(ruleset, _embellish_internal_event(event, "post_event"))

await event_log.put(
dict(
type="Action",
action="post_event",
action_uuid=str(uuid.uuid4()),
ruleset=source_ruleset_name,
ruleset_uuid=source_ruleset_uuid,
rule=source_rule_name,
rule_uuid=source_rule_uuid,
activation_id=settings.identifier,
run_at=run_at(),
status=INTERNAL_ACTION_STATUS,
matching_events=_get_events(variables),
rule_run_at=rule_run_at,
serialize(
Action(
type="Action",
action="post_event",
action_uuid=str(uuid.uuid4()),
ruleset=source_ruleset_name,
ruleset_uuid=source_ruleset_uuid,
rule=source_rule_name,
rule_uuid=source_rule_uuid,
activation_id=settings.identifier,
run_at=run_at(),
status=INTERNAL_ACTION_STATUS,
matching_events=_get_events(variables),
rule_run_at=rule_run_at,
)
)
)

Expand Down Expand Up @@ -685,22 +697,24 @@ async def post_process_runner(
error_message = _get_latest_artifact(private_data_dir, "stdout")
logger.error(error_message)

result = dict(
type="Action",
action=action,
action_uuid=str(uuid.uuid4()),
activation_id=activation_id,
playbook_name=name,
job_id=job_id,
ruleset=ruleset,
ruleset_uuid=ruleset_uuid,
rule=rule,
rule_uuid=rule_uuid,
rc=rc,
status=status,
run_at=run_at,
matching_events=_get_events(variables),
rule_run_at=rule_run_at,
result = serialize(
Action(
type="Action",
action=action,
action_uuid=str(uuid.uuid4()),
activation_id=activation_id,
playbook_name=name,
job_id=job_id,
ruleset=ruleset,
ruleset_uuid=ruleset_uuid,
rule=rule,
rule_uuid=rule_uuid,
rc=rc,
status=status,
run_at=run_at,
matching_events=_get_events(variables),
rule_run_at=rule_run_at,
)
)
await event_log.put(result)

Expand Down Expand Up @@ -806,23 +820,25 @@ async def run_job_template(
controller_job["created"] = run_at()
controller_job["error"] = str(ex)

a_log = dict(
type="Action",
action="run_job_template",
action_uuid=str(uuid.uuid4()),
activation_id=settings.identifier,
job_template_name=name,
organization=organization,
job_id=job_id,
ruleset=ruleset,
ruleset_uuid=source_ruleset_uuid,
rule=source_rule_name,
rule_uuid=source_rule_uuid,
status=controller_job["status"],
run_at=controller_job["created"],
url=_controller_job_url(controller_job),
matching_events=_get_events(variables),
rule_run_at=rule_run_at,
a_log = serialize(
Action(
type="Action",
action="run_job_template",
action_uuid=str(uuid.uuid4()),
activation_id=settings.identifier,
job_template_name=name,
organization=organization,
job_id=job_id,
ruleset=ruleset,
ruleset_uuid=source_ruleset_uuid,
rule=source_rule_name,
rule_uuid=source_rule_uuid,
status=controller_job["status"],
run_at=controller_job["created"],
url=_controller_job_url(controller_job),
matching_events=_get_events(variables),
rule_run_at=rule_run_at,
)
)
if "error" in controller_job:
a_log["message"] = controller_job["error"]
Expand Down Expand Up @@ -925,23 +941,25 @@ async def run_workflow_template(
controller_job["created"] = run_at()
controller_job["error"] = str(ex)

a_log = dict(
type="Action",
action="run_workflow_template",
action_uuid=str(uuid.uuid4()),
activation_id=settings.identifier,
job_template_name=name,
organization=organization,
job_id=job_id,
ruleset=ruleset,
ruleset_uuid=source_ruleset_uuid,
rule=source_rule_name,
rule_uuid=source_rule_uuid,
status=controller_job["status"],
run_at=controller_job["created"],
url=_controller_job_url(controller_job),
matching_events=_get_events(variables),
rule_run_at=rule_run_at,
a_log = serialize(
Action(
type="Action",
action="run_workflow_template",
action_uuid=str(uuid.uuid4()),
activation_id=settings.identifier,
job_template_name=name,
organization=organization,
job_id=job_id,
ruleset=ruleset,
ruleset_uuid=source_ruleset_uuid,
rule=source_rule_name,
rule_uuid=source_rule_uuid,
status=controller_job["status"],
run_at=controller_job["created"],
url=_controller_job_url(controller_job),
matching_events=_get_events(variables),
rule_run_at=rule_run_at,
)
)
if "error" in controller_job:
a_log["message"] = controller_job["error"]
Expand Down Expand Up @@ -988,22 +1006,24 @@ async def shutdown(
kind: str = "graceful",
):
await event_log.put(
dict(
type="Action",
action="shutdown",
action_uuid=str(uuid.uuid4()),
activation_id=settings.identifier,
ruleset=source_ruleset_name,
ruleset_uuid=source_ruleset_uuid,
rule=source_rule_name,
rule_uuid=source_rule_uuid,
run_at=run_at(),
status=INTERNAL_ACTION_STATUS,
matching_events=_get_events(variables),
delay=delay,
message=message,
kind=kind,
rule_run_at=rule_run_at,
serialize(
Action(
type="Action",
action="shutdown",
action_uuid=str(uuid.uuid4()),
activation_id=settings.identifier,
ruleset=source_ruleset_name,
ruleset_uuid=source_ruleset_uuid,
rule=source_rule_name,
rule_uuid=source_rule_uuid,
run_at=run_at(),
status=INTERNAL_ACTION_STATUS,
matching_events=_get_events(variables),
delay=delay,
message=message,
kind=kind,
rule_run_at=rule_run_at,
)
)
)

Expand Down
Loading

0 comments on commit 9143c39

Please sign in to comment.