diff --git a/src/yamcs/pymdb/alarms.py b/src/yamcs/pymdb/alarms.py index 8834023..a48fdf3 100644 --- a/src/yamcs/pymdb/alarms.py +++ b/src/yamcs/pymdb/alarms.py @@ -1,5 +1,11 @@ +from __future__ import annotations + from collections.abc import Mapping from enum import Enum, auto +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from yamcs.pymdb.expressions import Expression class AlarmLevel(Enum): @@ -79,6 +85,13 @@ def __init__( """ +class EnumerationContextAlarm: + + def __init__(self, context: Expression, alarm: EnumerationAlarm): + self.context = context + self.alarm = alarm + + class ThresholdAlarm(Alarm): """ Alarm definition specifying the thresholds for detecting out of limit values. @@ -138,3 +151,10 @@ def __init__( self.warning_high_exclusive = warning_high_exclusive self.watch_high = watch_high self.watch_high_exclusive = watch_high_exclusive + + +class ThresholdContextAlarm: + + def __init__(self, context: Expression, alarm: ThresholdAlarm): + self.context = context + self.alarm = alarm diff --git a/src/yamcs/pymdb/parameters.py b/src/yamcs/pymdb/parameters.py index 2f7deb8..69de7bb 100644 --- a/src/yamcs/pymdb/parameters.py +++ b/src/yamcs/pymdb/parameters.py @@ -25,6 +25,7 @@ from yamcs.pymdb.encodings import Encoding, TimeEncoding if TYPE_CHECKING: + from yamcs.pymdb.alarms import EnumerationContextAlarm, ThresholdContextAlarm from yamcs.pymdb.calibrators import Calibrator from yamcs.pymdb.systems import System @@ -374,6 +375,7 @@ def __init__( name: str, choices: Choices, alarm: EnumerationAlarm | None = None, + context_alarms: Sequence[EnumerationContextAlarm] | None = None, aliases: Mapping[str, str] | None = None, data_source: DataSource = DataSource.TELEMETERED, initial_value: Any = None, @@ -406,6 +408,9 @@ def __init__( self.alarm: EnumerationAlarm | None = alarm """Specification for alarm monitoring""" + self.context_alarms: list[EnumerationContextAlarm] = list(context_alarms or []) + """Alarm specification when a specific context expression applies""" + class FloatParameter(Parameter, FloatDataType): """ @@ -432,6 +437,7 @@ def __init__( encoding: Encoding | None = None, calibrator: Calibrator | None = None, alarm: ThresholdAlarm | None = None, + context_alarms: Sequence[ThresholdContextAlarm] | None = None, ) -> None: FloatDataType.__init__( self, @@ -460,6 +466,9 @@ def __init__( self.alarm: ThresholdAlarm | None = alarm """Specification for alarm monitoring""" + self.context_alarms: list[ThresholdContextAlarm] = list(context_alarms or []) + """Alarm specification when a specific context expression applies""" + class IntegerParameter(Parameter, IntegerDataType): """ @@ -485,6 +494,7 @@ def __init__( encoding: Encoding | None = None, calibrator: Calibrator | None = None, alarm: ThresholdAlarm | None = None, + context_alarms: Sequence[ThresholdContextAlarm] | None = None, ) -> None: IntegerDataType.__init__( self, @@ -512,6 +522,9 @@ def __init__( self.alarm: ThresholdAlarm | None = alarm """Specification for alarm monitoring""" + self.context_alarms: list[ThresholdContextAlarm] = list(context_alarms or []) + """Alarm specification when a specific context expression applies""" + class StringParameter(Parameter, StringDataType): """ diff --git a/src/yamcs/pymdb/xtce.py b/src/yamcs/pymdb/xtce.py index 2dcc24a..cc742cf 100644 --- a/src/yamcs/pymdb/xtce.py +++ b/src/yamcs/pymdb/xtce.py @@ -9,7 +9,7 @@ from typing import TYPE_CHECKING, Any, Sequence, cast from xml.dom import minidom -from yamcs.pymdb.alarms import AlarmLevel, ThresholdAlarm +from yamcs.pymdb.alarms import AlarmLevel, ThresholdAlarm, ThresholdContextAlarm from yamcs.pymdb.algorithms import ( Algorithm, ContainerTrigger, @@ -601,6 +601,7 @@ def add_parameter_type_set(self, parent: ET.Element, system: System): name=parameter.name, initial_value=parameter.initial_value, alarm=parameter.alarm, + context_alarms=parameter.context_alarms, data_type=parameter, ) elif isinstance(parameter, IntegerParameter): @@ -610,6 +611,7 @@ def add_parameter_type_set(self, parent: ET.Element, system: System): name=parameter.name, initial_value=parameter.initial_value, alarm=parameter.alarm, + context_alarms=parameter.context_alarms, data_type=parameter, ) elif isinstance(parameter, StringParameter): @@ -938,6 +940,7 @@ def add_aggregate_parameter_type( name=member_type_name, initial_value=member.initial_value, alarm=None, + context_alarms=None, data_type=member, ) elif isinstance(member, IntegerMember): @@ -947,6 +950,7 @@ def add_aggregate_parameter_type( name=member_type_name, initial_value=member.initial_value, alarm=None, + context_alarms=None, data_type=member, ) elif isinstance(member, StringMember): @@ -1045,6 +1049,7 @@ def add_array_parameter_type( name=element_type_name, initial_value=None, alarm=None, + context_alarms=None, data_type=el_type, ) elif isinstance(el_type, IntegerDataType): @@ -1054,6 +1059,7 @@ def add_array_parameter_type( name=element_type_name, initial_value=None, alarm=None, + context_alarms=None, data_type=el_type, ) elif isinstance(el_type, StringDataType): @@ -1202,6 +1208,29 @@ def add_enumerated_parameter_type( state_el.attrib["alarmLevel"] = self.alarm_level_to_text(v) state_el.attrib["enumerationLabel"] = k + if data_type.context_alarms: + list_el = ET.SubElement(el, "ContextAlarmList") + for context_alarm in data_type.context_alarms: + context_alarm_el = ET.SubElement(list_el, "ContextAlarm") + context_alarm_el.attrib["minViolations"] = str( + context_alarm.alarm.minimum_violations + ) + context_alarm_el.attrib["defaultAlarmLevel"] = ( + self.alarm_level_to_text(context_alarm.alarm.default_level) + ) + + states_el = ET.SubElement(context_alarm_el, "EnumerationAlarmList") + for k, v in context_alarm.alarm.states.items(): + state_el = ET.SubElement(states_el, "EnumerationAlarm") + state_el.attrib["alarmLevel"] = self.alarm_level_to_text(v) + state_el.attrib["enumerationLabel"] = k + + context_el = ET.SubElement(context_alarm_el, "ContextMatch") + condition_el = ET.SubElement(context_el, "BooleanExpression") + self.add_expression_condition( + condition_el, system, context_alarm.context + ) + def add_float_parameter_type( self, parent: ET.Element, @@ -1209,6 +1238,7 @@ def add_float_parameter_type( name: str, initial_value: Any, alarm: ThresholdAlarm | None, + context_alarms: Sequence[ThresholdContextAlarm] | None, data_type: FloatDataType, ): el = ET.SubElement(parent, "FloatParameterType") @@ -1251,6 +1281,20 @@ def add_float_parameter_type( alarm_el.attrib["minViolations"] = str(alarm.minimum_violations) self.add_static_alarm_ranges(alarm_el, alarm) + if context_alarms: + list_el = ET.SubElement(el, "ContextAlarmList") + for context_alarm in context_alarms: + context_alarm_el = ET.SubElement(list_el, "ContextAlarm") + context_alarm_el.attrib["minViolations"] = str( + context_alarm.alarm.minimum_violations + ) + self.add_static_alarm_ranges(context_alarm_el, context_alarm.alarm) + context_el = ET.SubElement(context_alarm_el, "ContextMatch") + condition_el = ET.SubElement(context_el, "BooleanExpression") + self.add_expression_condition( + condition_el, system, context_alarm.context + ) + def add_integer_parameter_type( self, parent: ET.Element, @@ -1258,6 +1302,7 @@ def add_integer_parameter_type( name: str, initial_value: Any, alarm: ThresholdAlarm | None, + context_alarms: Sequence[ThresholdContextAlarm] | None, data_type: IntegerDataType, ): el = ET.SubElement(parent, "IntegerParameterType") @@ -1294,6 +1339,20 @@ def add_integer_parameter_type( alarm_el.attrib["minViolations"] = str(alarm.minimum_violations) self.add_static_alarm_ranges(alarm_el, alarm) + if context_alarms: + list_el = ET.SubElement(el, "ContextAlarmList") + for context_alarm in context_alarms: + context_alarm_el = ET.SubElement(list_el, "ContextAlarm") + context_alarm_el.attrib["minViolations"] = str( + context_alarm.alarm.minimum_violations + ) + self.add_static_alarm_ranges(context_alarm_el, context_alarm.alarm) + context_el = ET.SubElement(context_alarm_el, "ContextMatch") + condition_el = ET.SubElement(context_el, "BooleanExpression") + self.add_expression_condition( + condition_el, system, context_alarm.context + ) + def add_string_parameter_type( self, parent: ET.Element,