diff --git a/space_packet_parser/definitions.py b/space_packet_parser/definitions.py index b2aa568..08ec521 100644 --- a/space_packet_parser/definitions.py +++ b/space_packet_parser/definitions.py @@ -87,6 +87,8 @@ def __init__( self.type_tag_to_object = {k.format(**self.ns): v for k, v in self._tag_to_type_template.items()} + self._populate_parameter_type_cache() + self._populate_parameter_cache() self._populate_sequence_container_cache() def __getitem__(self, item): @@ -104,6 +106,59 @@ def _populate_sequence_container_cache(self): if sc.base_container_name: self._sequence_container_cache[sc.base_container_name].inheritors.append(name) + def _populate_parameter_type_cache(self): + """Force populating parameter_type_cache by parsing all ParameterTypes""" + for parameter_type_element in self.parameter_type_set.findall('./*', self.ns): + parameter_type_name = parameter_type_element.attrib['name'] + if parameter_type_name in self._parameter_cache: + raise ValueError(f"Found duplicate parameter type {parameter_type_name}. " + f"Parameter types are expected to be unique") + try: + parameter_type_class = self.type_tag_to_object[parameter_type_element.tag] + except KeyError as e: + if ( + "ArrayParameterType" in parameter_type_element.tag or + "AggregateParameterType" in parameter_type_element.tag + ): + raise NotImplementedError(f"Unsupported parameter type {parameter_type_element.tag}. " + "Supporting this parameter type is in the roadmap but has " + "not yet been implemented.") from e + raise InvalidParameterTypeError(f"Invalid parameter type {parameter_type_element.tag}. " + "If you believe this is a valid XTCE parameter type, " + "please open a feature request as a Github issue with a " + "reference to the XTCE element description for the " + "parameter type element.") from e + parameter_type_object = parameter_type_class.from_parameter_type_xml_element( + parameter_type_element, self.ns) + self._parameter_type_cache[parameter_type_name] = parameter_type_object # Add to cache + + def _populate_parameter_cache(self): + """Force populating parameter_cache by parsing all Parameters""" + for parameter_element in self.parameter_set.findall("./xtce:Parameter", self.ns): + parameter_name = parameter_element.attrib['name'] + if parameter_name in self._parameter_cache: + raise ValueError(f"Found duplicate parameter name {parameter_name}. " + f"Parameters are expected to be unique") + parameter_type_name = parameter_element.attrib['parameterTypeRef'] + + # Lookup from within the parameter type cache + parameter_type_object = self._parameter_type_cache[parameter_type_name] + + parameter_short_description = parameter_element.attrib['shortDescription'] if ( + 'shortDescription' in parameter_element.attrib + ) else None + parameter_long_description = parameter_element.find('xtce:LongDescription', self.ns).text if ( + parameter_element.find('xtce:LongDescription', self.ns) is not None + ) else None + + parameter_object = parameters.Parameter( + name=parameter_name, + parameter_type=parameter_type_object, + short_description=parameter_short_description, + long_description=parameter_long_description + ) + self._parameter_cache[parameter_name] = parameter_object # Add to cache + def parse_sequence_container_contents(self, sequence_container: ElementTree.Element) -> packets.SequenceContainer: """Parses the list of parameters in a SequenceContainer element, recursively parsing nested SequenceContainers @@ -136,53 +191,8 @@ def parse_sequence_container_contents(self, for entry in container_contents: if entry.tag == '{{{xtce}}}ParameterRefEntry'.format(**self.ns): # pylint: disable=consider-using-f-string parameter_name = entry.attrib['parameterRef'] + entry_list.append(self._parameter_cache[parameter_name]) - # If we've already parsed this parameter in a different container - if parameter_name in self._parameter_cache: - entry_list.append(self._parameter_cache[parameter_name]) - else: - parameter_element = self._find_parameter(parameter_name) - parameter_type_name = parameter_element.attrib['parameterTypeRef'] - - # If we've already parsed this parameter type for a different parameter - if parameter_type_name in self._parameter_type_cache: - parameter_type_object = self._parameter_type_cache[parameter_type_name] - else: - parameter_type_element = self._find_parameter_type(parameter_type_name) - try: - parameter_type_class = self.type_tag_to_object[parameter_type_element.tag] - except KeyError as e: - if ( - "ArrayParameterType" in parameter_type_element.tag or - "AggregateParameterType" in parameter_type_element.tag - ): - raise NotImplementedError(f"Unsupported parameter type {parameter_type_element.tag}. " - "Supporting this parameter type is in the roadmap but has " - "not yet been implemented.") from e - raise InvalidParameterTypeError(f"Invalid parameter type {parameter_type_element.tag}. " - "If you believe this is a valid XTCE parameter type, " - "please open a feature request as a Github issue with a " - "reference to the XTCE element description for the " - "parameter type element.") from e - parameter_type_object = parameter_type_class.from_parameter_type_xml_element( - parameter_type_element, self.ns) - self._parameter_type_cache[parameter_type_name] = parameter_type_object # Add to cache - - parameter_short_description = parameter_element.attrib['shortDescription'] if ( - 'shortDescription' in parameter_element.attrib - ) else None - parameter_long_description = parameter_element.find('xtce:LongDescription', self.ns).text if ( - parameter_element.find('xtce:LongDescription', self.ns) is not None - ) else None - - parameter_object = parameters.Parameter( - name=parameter_name, - parameter_type=parameter_type_object, - short_description=parameter_short_description, - long_description=parameter_long_description - ) - entry_list.append(parameter_object) - self._parameter_cache[parameter_name] = parameter_object # Add to cache elif entry.tag == '{{{xtce}}}ContainerRefEntry'.format( # pylint: disable=consider-using-f-string **self.ns): nested_container = self._find_container(name=entry.attrib['containerRef']) @@ -268,40 +278,6 @@ def _find_container(self, name: str) -> ElementTree.Element: f"Container names are expected to exist and be unique." return containers[0] - def _find_parameter(self, name: str) -> ElementTree.Element: - """Finds an XTCE Parameter in the tree. - - Parameters - ---------- - name : str - Name of the parameter to find - - Returns - ------- - : ElementTree.Element - """ - params = self.parameter_set.findall(f"./xtce:Parameter[@name='{name}']", self.ns) - assert len(params) == 1, f"Found {len(params)} matching parameters with name {name}. " \ - f"Parameter names are expected to exist and be unique." - return params[0] - - def _find_parameter_type(self, name: str) -> ElementTree.Element: - """Finds an XTCE ParameterType in the tree. - - Parameters - ---------- - name : str - Name of the parameter type to find - - Returns - ------- - : ElementTree.Element - """ - param_types = self.parameter_type_set.findall(f"./*[@name='{name}']", self.ns) - assert len(param_types) == 1, f"Found {len(param_types)} matching parameter types with name {name}. " \ - f"Parameter type names are expected to exist and be unique." - return param_types[0] - def _get_container_base_container( self, container_element: ElementTree.Element) -> Tuple[ElementTree.Element, List[comparisons.MatchCriteria]]: