Skip to content

Commit

Permalink
Add context alarms
Browse files Browse the repository at this point in the history
  • Loading branch information
fqqb committed Jun 29, 2024
1 parent 92b6d68 commit 14a538a
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 1 deletion.
20 changes: 20 additions & 0 deletions src/yamcs/pymdb/alarms.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
from __future__ import annotations

from collections.abc import Mapping
from enum import Enum, auto
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from yamcs.pymdb.expressions import Expression


class AlarmLevel(Enum):
Expand Down Expand Up @@ -79,6 +85,13 @@ def __init__(
"""


class EnumerationContextAlarm:

def __init__(self, context: Expression, alarm: EnumerationAlarm):
self.context = context
self.alarm = alarm


class ThresholdAlarm(Alarm):
"""
Alarm definition specifying the thresholds for detecting out of limit values.
Expand Down Expand Up @@ -138,3 +151,10 @@ def __init__(
self.warning_high_exclusive = warning_high_exclusive
self.watch_high = watch_high
self.watch_high_exclusive = watch_high_exclusive


class ThresholdContextAlarm:

def __init__(self, context: Expression, alarm: ThresholdAlarm):
self.context = context
self.alarm = alarm
13 changes: 13 additions & 0 deletions src/yamcs/pymdb/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from yamcs.pymdb.encodings import Encoding, TimeEncoding

if TYPE_CHECKING:
from yamcs.pymdb.alarms import EnumerationContextAlarm, ThresholdContextAlarm
from yamcs.pymdb.calibrators import Calibrator
from yamcs.pymdb.systems import System

Expand Down Expand Up @@ -374,6 +375,7 @@ def __init__(
name: str,
choices: Choices,
alarm: EnumerationAlarm | None = None,
context_alarms: Sequence[EnumerationContextAlarm] | None = None,
aliases: Mapping[str, str] | None = None,
data_source: DataSource = DataSource.TELEMETERED,
initial_value: Any = None,
Expand Down Expand Up @@ -406,6 +408,9 @@ def __init__(
self.alarm: EnumerationAlarm | None = alarm
"""Specification for alarm monitoring"""

self.context_alarms: list[EnumerationContextAlarm] = list(context_alarms or [])
"""Alarm specification when a specific context expression applies"""


class FloatParameter(Parameter, FloatDataType):
"""
Expand All @@ -432,6 +437,7 @@ def __init__(
encoding: Encoding | None = None,
calibrator: Calibrator | None = None,
alarm: ThresholdAlarm | None = None,
context_alarms: Sequence[ThresholdContextAlarm] | None = None,
) -> None:
FloatDataType.__init__(
self,
Expand Down Expand Up @@ -460,6 +466,9 @@ def __init__(
self.alarm: ThresholdAlarm | None = alarm
"""Specification for alarm monitoring"""

self.context_alarms: list[ThresholdContextAlarm] = list(context_alarms or [])
"""Alarm specification when a specific context expression applies"""


class IntegerParameter(Parameter, IntegerDataType):
"""
Expand All @@ -485,6 +494,7 @@ def __init__(
encoding: Encoding | None = None,
calibrator: Calibrator | None = None,
alarm: ThresholdAlarm | None = None,
context_alarms: Sequence[ThresholdContextAlarm] | None = None,
) -> None:
IntegerDataType.__init__(
self,
Expand Down Expand Up @@ -512,6 +522,9 @@ def __init__(
self.alarm: ThresholdAlarm | None = alarm
"""Specification for alarm monitoring"""

self.context_alarms: list[ThresholdContextAlarm] = list(context_alarms or [])
"""Alarm specification when a specific context expression applies"""


class StringParameter(Parameter, StringDataType):
"""
Expand Down
61 changes: 60 additions & 1 deletion src/yamcs/pymdb/xtce.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from typing import TYPE_CHECKING, Any, Sequence, cast
from xml.dom import minidom

from yamcs.pymdb.alarms import AlarmLevel, ThresholdAlarm
from yamcs.pymdb.alarms import AlarmLevel, ThresholdAlarm, ThresholdContextAlarm
from yamcs.pymdb.algorithms import (
Algorithm,
ContainerTrigger,
Expand Down Expand Up @@ -601,6 +601,7 @@ def add_parameter_type_set(self, parent: ET.Element, system: System):
name=parameter.name,
initial_value=parameter.initial_value,
alarm=parameter.alarm,
context_alarms=parameter.context_alarms,
data_type=parameter,
)
elif isinstance(parameter, IntegerParameter):
Expand All @@ -610,6 +611,7 @@ def add_parameter_type_set(self, parent: ET.Element, system: System):
name=parameter.name,
initial_value=parameter.initial_value,
alarm=parameter.alarm,
context_alarms=parameter.context_alarms,
data_type=parameter,
)
elif isinstance(parameter, StringParameter):
Expand Down Expand Up @@ -938,6 +940,7 @@ def add_aggregate_parameter_type(
name=member_type_name,
initial_value=member.initial_value,
alarm=None,
context_alarms=None,
data_type=member,
)
elif isinstance(member, IntegerMember):
Expand All @@ -947,6 +950,7 @@ def add_aggregate_parameter_type(
name=member_type_name,
initial_value=member.initial_value,
alarm=None,
context_alarms=None,
data_type=member,
)
elif isinstance(member, StringMember):
Expand Down Expand Up @@ -1045,6 +1049,7 @@ def add_array_parameter_type(
name=element_type_name,
initial_value=None,
alarm=None,
context_alarms=None,
data_type=el_type,
)
elif isinstance(el_type, IntegerDataType):
Expand All @@ -1054,6 +1059,7 @@ def add_array_parameter_type(
name=element_type_name,
initial_value=None,
alarm=None,
context_alarms=None,
data_type=el_type,
)
elif isinstance(el_type, StringDataType):
Expand Down Expand Up @@ -1202,13 +1208,37 @@ def add_enumerated_parameter_type(
state_el.attrib["alarmLevel"] = self.alarm_level_to_text(v)
state_el.attrib["enumerationLabel"] = k

if data_type.context_alarms:
list_el = ET.SubElement(el, "ContextAlarmList")
for context_alarm in data_type.context_alarms:
context_alarm_el = ET.SubElement(list_el, "ContextAlarm")
context_alarm_el.attrib["minViolations"] = str(
context_alarm.alarm.minimum_violations
)
context_alarm_el.attrib["defaultAlarmLevel"] = (
self.alarm_level_to_text(context_alarm.alarm.default_level)
)

states_el = ET.SubElement(context_alarm_el, "EnumerationAlarmList")
for k, v in context_alarm.alarm.states.items():
state_el = ET.SubElement(states_el, "EnumerationAlarm")
state_el.attrib["alarmLevel"] = self.alarm_level_to_text(v)
state_el.attrib["enumerationLabel"] = k

context_el = ET.SubElement(context_alarm_el, "ContextMatch")
condition_el = ET.SubElement(context_el, "BooleanExpression")
self.add_expression_condition(
condition_el, system, context_alarm.context
)

def add_float_parameter_type(
self,
parent: ET.Element,
system: System,
name: str,
initial_value: Any,
alarm: ThresholdAlarm | None,
context_alarms: Sequence[ThresholdContextAlarm] | None,
data_type: FloatDataType,
):
el = ET.SubElement(parent, "FloatParameterType")
Expand Down Expand Up @@ -1251,13 +1281,28 @@ def add_float_parameter_type(
alarm_el.attrib["minViolations"] = str(alarm.minimum_violations)
self.add_static_alarm_ranges(alarm_el, alarm)

if context_alarms:
list_el = ET.SubElement(el, "ContextAlarmList")
for context_alarm in context_alarms:
context_alarm_el = ET.SubElement(list_el, "ContextAlarm")
context_alarm_el.attrib["minViolations"] = str(
context_alarm.alarm.minimum_violations
)
self.add_static_alarm_ranges(context_alarm_el, context_alarm.alarm)
context_el = ET.SubElement(context_alarm_el, "ContextMatch")
condition_el = ET.SubElement(context_el, "BooleanExpression")
self.add_expression_condition(
condition_el, system, context_alarm.context
)

def add_integer_parameter_type(
self,
parent: ET.Element,
system: System,
name: str,
initial_value: Any,
alarm: ThresholdAlarm | None,
context_alarms: Sequence[ThresholdContextAlarm] | None,
data_type: IntegerDataType,
):
el = ET.SubElement(parent, "IntegerParameterType")
Expand Down Expand Up @@ -1294,6 +1339,20 @@ def add_integer_parameter_type(
alarm_el.attrib["minViolations"] = str(alarm.minimum_violations)
self.add_static_alarm_ranges(alarm_el, alarm)

if context_alarms:
list_el = ET.SubElement(el, "ContextAlarmList")
for context_alarm in context_alarms:
context_alarm_el = ET.SubElement(list_el, "ContextAlarm")
context_alarm_el.attrib["minViolations"] = str(
context_alarm.alarm.minimum_violations
)
self.add_static_alarm_ranges(context_alarm_el, context_alarm.alarm)
context_el = ET.SubElement(context_alarm_el, "ContextMatch")
condition_el = ET.SubElement(context_el, "BooleanExpression")
self.add_expression_condition(
condition_el, system, context_alarm.context
)

def add_string_parameter_type(
self,
parent: ET.Element,
Expand Down

0 comments on commit 14a538a

Please sign in to comment.