Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More Cleanup #109

Merged
merged 21 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbrakes/airbrakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def __init__(
:param servo: The servo object that controls the extension of the airbrakes. This can be a
real servo or a mock servo.
:param imu: The IMU object that reads data from the rocket's IMU. This can be a real IMU or
a mock IMU.
a mock IMU.
harshil21 marked this conversation as resolved.
Show resolved Hide resolved
:param logger: The logger object that logs data to a CSV file.
:param data_processor: The data processor object that processes IMU data on a higher level.
:param apogee_predictor: The apogee predictor object that predicts the apogee of the rocket.
Expand Down
19 changes: 11 additions & 8 deletions airbrakes/data_handling/apogee_predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@

from airbrakes.data_handling.processed_data_packet import ProcessedDataPacket
from constants import (
APOGEE_PREDICTION_FREQUENCY,
APOGEE_PREDICTION_MIN_PACKETS,
CURVE_FIT_INITIAL,
FLIGHT_LENGTH_SECONDS,
GRAVITY,
INTEGRATION_TIME_STEP,
GRAVITY_METERS_PER_SECOND_SQUARED,
INTEGRATION_TIME_STEP_SECONDS,
STOP_SIGNAL,
UNCERTAINTY_THRESHOLD,
)
Expand Down Expand Up @@ -177,21 +177,24 @@ def _update_prediction_lookup_table(self, curve_coefficients: CurveCoefficients)
# altitude.

# This is all the x values that we will use to integrate the acceleration function
predicted_coast_timestamps = np.arange(0, FLIGHT_LENGTH_SECONDS, INTEGRATION_TIME_STEP)
predicted_coast_timestamps = np.arange(
0, FLIGHT_LENGTH_SECONDS, INTEGRATION_TIME_STEP_SECONDS
)

predicted_accelerations = (
self._curve_fit_function(
predicted_coast_timestamps, curve_coefficients.A, curve_coefficients.B
)
- GRAVITY
- GRAVITY_METERS_PER_SECOND_SQUARED
)
predicted_velocities = (
np.cumsum(predicted_accelerations) * INTEGRATION_TIME_STEP + self._initial_velocity
np.cumsum(predicted_accelerations) * INTEGRATION_TIME_STEP_SECONDS
+ self._initial_velocity
)
# We don't care about velocity values less than 0 as those correspond with the rocket
# falling
predicted_velocities = predicted_velocities[predicted_velocities >= 0]
predicted_altitudes = np.cumsum(predicted_velocities) * INTEGRATION_TIME_STEP
predicted_altitudes = np.cumsum(predicted_velocities) * INTEGRATION_TIME_STEP_SECONDS
predicted_apogee = np.max(predicted_altitudes)
# We need to flip the lookup table because the velocities are in descending order, not
# ascending order. We need them to be in ascending order for the interpolation to work.
Expand Down Expand Up @@ -223,7 +226,7 @@ def _prediction_loop(self) -> None:

self._extract_processed_data_packets(data_packets)

if len(self._accelerations) - last_run_length >= APOGEE_PREDICTION_FREQUENCY:
if len(self._accelerations) - last_run_length >= APOGEE_PREDICTION_MIN_PACKETS:
self._cumulative_time_differences = np.cumsum(self._time_differences)
# We only want to keep curve fitting if the curve fit hasn't converged yet
if not self._has_apogee_converged:
Expand Down
7 changes: 5 additions & 2 deletions airbrakes/data_handling/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from airbrakes.data_handling.imu_data_packet import EstimatedDataPacket
from airbrakes.data_handling.processed_data_packet import ProcessedDataPacket
from constants import ACCELERATION_NOISE_THRESHOLD, GRAVITY
from constants import ACCEL_DEADBAND_METERS_PER_SECOND_SQUARED, GRAVITY_METERS_PER_SECOND_SQUARED
from utils import deadband


Expand Down Expand Up @@ -259,7 +259,10 @@ def _calculate_vertical_velocity(self) -> npt.NDArray[np.float64]:
# subtracted from vertical acceleration, Then deadbanded.
vertical_accelerations = np.array(
[
deadband(vertical_acceleration - GRAVITY, ACCELERATION_NOISE_THRESHOLD)
deadband(
vertical_acceleration - GRAVITY_METERS_PER_SECOND_SQUARED,
ACCEL_DEADBAND_METERS_PER_SECOND_SQUARED,
)
for vertical_acceleration in self._rotated_accelerations
]
)
Expand Down
31 changes: 14 additions & 17 deletions airbrakes/data_handling/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ class Logger:

It uses Python's csv module to append the airbrakes' current state, extension, and IMU data to
our logs in real time.

:param log_dir: The directory where the log files will be.
"""

__slots__ = ("_log_buffer", "_log_counter", "_log_process", "_log_queue", "log_path")
Expand Down Expand Up @@ -95,6 +93,18 @@ def _convert_unknown_type(obj_type: Any) -> str:
"""
return f"{obj_type:.8f}"

@staticmethod
def _truncate_floats(data: LoggedDataPacket) -> dict[str, str | object]:
"""
Truncates the decimal place of the floats in the dictionary to 8 decimal places.
:param data: The dictionary to truncate.
:return: The truncated dictionary.
"""
return {
key: f"{value:.8f}" if isinstance(value, float) else value
for key, value in data.items()
}

def start(self) -> None:
"""
Starts the logging process. This is called before the main while loop starts.
Expand Down Expand Up @@ -207,7 +217,7 @@ def _prepare_log_dict(
# Convert the processed data packet to a dictionary. Unknown types such as numpy
# float64 are converted to strings with 8 decimal places (that's enc_hook)
processed_data_packet_dict: dict[str, float] = to_builtins(
processed_data_packets.popleft(), enc_hook=self._convert_unknown_type
processed_data_packets.popleft(), enc_hook=Logger._convert_unknown_type
harshil21 marked this conversation as resolved.
Show resolved Hide resolved
)
# Let's drop the "time_since_last_data_packet" field:
processed_data_packet_dict.pop("time_since_last_data_packet", None)
Expand All @@ -220,19 +230,6 @@ def _prepare_log_dict(

return logged_data_packets

# ------------------------------- RUN IN A SEPARATE PROCESS -----------------------------------
JacksonElia marked this conversation as resolved.
Show resolved Hide resolved
@staticmethod
def _truncate_floats(data: LoggedDataPacket) -> dict[str, str | object]:
"""
Truncates the decimal place of the floats in the dictionary to 8 decimal places.
:param data: The dictionary to truncate.
:return: The truncated dictionary.
"""
return {
key: f"{value:.8f}" if isinstance(value, float) else value
for key, value in data.items()
}

def _logging_loop(self) -> None:
"""
The loop that saves data to the logs. It runs in parallel with the main loop.
Expand All @@ -249,4 +246,4 @@ def _logging_loop(self) -> None:
# If the message is the stop signal, break out of the loop
if message_fields == STOP_SIGNAL:
break
writer.writerow(self._truncate_floats(message_fields))
writer.writerow(Logger._truncate_floats(message_fields))
harshil21 marked this conversation as resolved.
Show resolved Hide resolved
157 changes: 88 additions & 69 deletions airbrakes/hardware/imu.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
IMUDataPacket,
RawDataPacket,
)
from constants import ESTIMATED_DESCRIPTOR_SET, MAX_QUEUE_SIZE, PROCESS_TIMEOUT, RAW_DESCRIPTOR_SET
from constants import (
ESTIMATED_DESCRIPTOR_SET,
IMU_TIMEOUT_SECONDS,
MAX_QUEUE_SIZE,
RAW_DESCRIPTOR_SET,
)


class IMU:
Expand All @@ -42,8 +47,6 @@ def __init__(self, port: str) -> None:
"""
Initializes the object that interacts with the physical IMU connected to the pi.
:param port: the port that the IMU is connected to
:param frequency: the frequency that the IMU is set to poll at (this can be checked in
SensorConnect)
"""
# Shared Queue which contains the latest data from the IMU. The MAX_QUEUE_SIZE is there
# to prevent memory issues. Realistically, the queue size never exceeds 50 packets when
Expand All @@ -66,6 +69,66 @@ def is_running(self) -> bool:
"""
return self._running.value

@staticmethod
def _initialize_packet(packet) -> IMUDataPacket | None:
harshil21 marked this conversation as resolved.
Show resolved Hide resolved
"""
Initialize an IMU data packet based on its descriptor set.
:param packet: The data packet from the IMU.
:return: An IMUDataPacket, or None if the packet is unrecognized.
"""
# Extract the timestamp from the packet.
timestamp = packet.collectedTimestamp().nanoseconds()

# Initialize packet with the timestamp, determines if the packet is raw or estimated
if packet.descriptorSet() == ESTIMATED_DESCRIPTOR_SET:
return EstimatedDataPacket(timestamp)
if packet.descriptorSet() == RAW_DESCRIPTOR_SET:
return RawDataPacket(timestamp)
return None

@staticmethod
def _process_data_point(data_point, channel: str, imu_data_packet) -> None:
harshil21 marked this conversation as resolved.
Show resolved Hide resolved
"""
Process an individual data point and set its value in the data packet object.
:param data_point: The IMU data point containing the measurement.
:param channel: The channel name of the data point.
:param imu_data_packet: The data packet object to update.
"""
# Handle special channels that represent quaternion data.
if channel in {"estAttitudeUncertQuaternion", "estOrientQuaternion"}:
JacksonElia marked this conversation as resolved.
Show resolved Hide resolved
# Convert quaternion data into a 4x1 matrix and set its components.
matrix = data_point.as_Matrix()
setattr(imu_data_packet, f"{channel}W", matrix.as_floatAt(0, 0))
setattr(imu_data_packet, f"{channel}X", matrix.as_floatAt(0, 1))
setattr(imu_data_packet, f"{channel}Y", matrix.as_floatAt(0, 2))
setattr(imu_data_packet, f"{channel}Z", matrix.as_floatAt(0, 3))
else:
# Set other data points directly as attributes in the data packet.
setattr(imu_data_packet, channel, data_point.as_float())

# Check if the data point is invalid and update the invalid fields list.
if not data_point.valid():
if imu_data_packet.invalid_fields is None:
imu_data_packet.invalid_fields = []
imu_data_packet.invalid_fields.append(channel)

@staticmethod
def _process_packet_data(packet, imu_data_packet) -> None:
JacksonElia marked this conversation as resolved.
Show resolved Hide resolved
"""
Process the data points in the packet and update the data packet object.
:param packet: The IMU data packet containing multiple data points.
:param imu_data_packet: The initialized data packet object to populate.
"""
# Iterate through each data point in the packet.
for data_point in packet.data():
# Extract the channel name of the data point.
channel = data_point.channelName()

# Check if the channel is relevant for the data packet.
if hasattr(imu_data_packet, channel) or "Quaternion" in channel:
# Process and set the data point value in the data packet.
IMU._process_data_point(data_point, channel, imu_data_packet)
harshil21 marked this conversation as resolved.
Show resolved Hide resolved

def start(self) -> None:
"""
Starts the process separate from the main process for fetching data from the IMU.
Expand All @@ -87,17 +150,15 @@ def stop(self) -> None:
# indefinitely (that's why there's a timeout in the get_imu_data_packet() method).
with contextlib.suppress(multiprocessing.TimeoutError):
self.get_imu_data_packets()
self._data_fetch_process.join(timeout=PROCESS_TIMEOUT)
self._data_fetch_process.join(timeout=IMU_TIMEOUT_SECONDS)

def get_imu_data_packet(self) -> IMUDataPacket | None:
"""
Gets the last available data packet from the IMU.
Note: If `get_imu_data_packet` is called slower than the frequency set, the data will not
be the latest, but the first in the queue.
:return: an IMUDataPacket object containing the latest data from the IMU. If a value is not
available, it will be None.
available, it will be None.
"""
return self._data_queue.get(timeout=PROCESS_TIMEOUT)
return self._data_queue.get(timeout=IMU_TIMEOUT_SECONDS)

def get_imu_data_packets(self) -> collections.deque[IMUDataPacket]:
"""
Expand All @@ -123,69 +184,27 @@ def _query_imu_for_data_packets(self, port: str) -> None:
self._fetch_data_loop(port)

def _fetch_data_loop(self, port: str) -> None:
harshil21 marked this conversation as resolved.
Show resolved Hide resolved
# Connect to the IMU
connection = mscl.Connection.Serial(port)
node = mscl.InertialNode(connection)
timeout = 10

"""
Continuously fetch data packets from the IMU and process them.
:param port: The serial port to connect to the IMU.
"""
while self._running.value:
# Get the latest data packets from the IMU, with the help of `getDataPackets`.
# `getDataPackets` accepts a timeout in milliseconds.
# During IMU configuration (outside of this code), we set the sampling rate of the IMU
# as 1ms for RawDataPackets, and 2ms for EstimatedDataPackets.
# So we use a timeout of 10ms which should be more
# than enough. If the timeout is hit, the function will return an empty list.

packets: mscl.MipDataPackets = node.getDataPackets(timeout)
# Every loop iteration we get the latest data in form of packets from the imu
# Reconnect to the IMU at the start of each loop iteration.
connection = mscl.Connection.Serial(port)
node = mscl.InertialNode(connection)
harshil21 marked this conversation as resolved.
Show resolved Hide resolved

# Retrieve data packets from the IMU.
packets: mscl.MipDataPackets = node.getDataPackets(timeout=10)

for packet in packets:
# The data packet from the IMU:
packet: mscl.MipDataPacket
harshil21 marked this conversation as resolved.
Show resolved Hide resolved

# Get the timestamp of the packet
timestamp = packet.collectedTimestamp().nanoseconds()

# Initialize packet with the timestamp, determines if the packet is raw or estimated
if packet.descriptorSet() == ESTIMATED_DESCRIPTOR_SET:
imu_data_packet = EstimatedDataPacket(timestamp)
elif packet.descriptorSet() == RAW_DESCRIPTOR_SET:
imu_data_packet = RawDataPacket(timestamp)
else:
# This is an unknown packet, so we skip it
# Initialize the appropriate data packet.
imu_data_packet = IMU._initialize_packet(packet)
harshil21 marked this conversation as resolved.
Show resolved Hide resolved
if imu_data_packet is None:
# Skip unrecognized packets.
continue

# Each of these packets has multiple data points
for data_point in packet.data():
data_point: mscl.MipDataPoint
channel: str = data_point.channelName()
# This cpp file was the only place I was able to find all the channel names
# https://github.com/LORD-MicroStrain/MSCL/blob/master/MSCL/source/mscl/MicroStrain/MIP/MipTypes.cpp
# Check if the channel name is one we want to save
if hasattr(imu_data_packet, channel) or "Quaternion" in channel:
# First checks if the data point needs special handling, if not, just set
# the attribute
match channel:
# These specific data points are matrix's rather than doubles
case "estAttitudeUncertQuaternion" | "estOrientQuaternion":
# This makes a 4x1 matrix from the data point with the data as
# [[w], [x], [y], [z]]
matrix = data_point.as_Matrix()
# Sets the W, X, Y, and Z of the quaternion to the data packet
setattr(imu_data_packet, f"{channel}W", matrix.as_floatAt(0, 0))
setattr(imu_data_packet, f"{channel}X", matrix.as_floatAt(0, 1))
setattr(imu_data_packet, f"{channel}Y", matrix.as_floatAt(0, 2))
setattr(imu_data_packet, f"{channel}Z", matrix.as_floatAt(0, 3))
case _:
# Because the attribute names in our data packet classes are the
# same as the channel names, we can just set the attribute to the
# value of the data point.
setattr(imu_data_packet, channel, data_point.as_float())

# If the data point is invalid, add it to the invalid fields list:
if not data_point.valid():
if imu_data_packet.invalid_fields is None:
imu_data_packet.invalid_fields = []
imu_data_packet.invalid_fields.append(channel)

# Put the latest data into the shared queue
# Process the packet's data and populate the data packet object.
IMU._process_packet_data(packet, imu_data_packet)
harshil21 marked this conversation as resolved.
Show resolved Hide resolved

# Add the processed data packet to the shared queue.
self._data_queue.put(imu_data_packet)
Loading