Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix display disappearance, add new field, fix ctrl-c #93

Merged
merged 2 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions airbrakes/hardware/imu.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def stop(self) -> None:
self.get_imu_data_packets()

with contextlib.suppress(multiprocessing.TimeoutError):
self._data_fetch_process.join(timeout=1)
self._data_fetch_process.join(timeout=2)
harshil21 marked this conversation as resolved.
Show resolved Hide resolved

def get_imu_data_packet(self) -> IMUDataPacket | None:
"""
Expand All @@ -99,7 +99,7 @@ def get_imu_data_packet(self) -> IMUDataPacket | None:
:return: an IMUDataPacket object containing the latest data from the IMU. If a value is not
available, it will be None.
"""
return self._data_queue.get(timeout=1)
return self._data_queue.get(timeout=3)
harshil21 marked this conversation as resolved.
Show resolved Hide resolved

def get_imu_data_packets(self) -> collections.deque[IMUDataPacket]:
"""
Expand Down
58 changes: 28 additions & 30 deletions airbrakes/mock/display.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class FlightDisplay:

__slots__ = (
"_airbrakes",
"_apogee_at_convergence",
"_args",
"_coast_time",
"_convergence_height",
Expand Down Expand Up @@ -69,6 +70,7 @@ def __init__(
self._coast_time: int = 0 # Coast time from CoastState
self._convergence_time: float = 0.0 # Time to convergence of apogee from CoastState
self._convergence_height: float = 0.0 # Height at convergence of apogee from CoastState
self._apogee_at_convergence: float = 0.0 # Apogee at prediction convergence from CoastState
# Prepare the processes for monitoring in the simulation:
self._processes: dict[str, psutil.Process] | None = None
self._cpu_usages: dict[str, float] | None = None
Expand Down Expand Up @@ -128,23 +130,23 @@ def update_display(self) -> None:
Updates the display with real-time data. Runs in another thread. Automatically stops when
the simulation ends.
"""
# Don't print the flight data if we are in debug mode
if self._args.debug:
return

while self._running:
if self.end_sim_natural.is_set():
self._update_display(self.NATURAL_END)
break
if self.end_sim_interrupted.is_set():
self._update_display(self.INTERRUPTED_END)
break
if not self._args.mock and self._airbrakes.state.name == "MotorBurnState":
self._update_display(self.STATE_END)
break
# Don't print the flight data if we are in debug mode
if not self._args.debug:
self._update_display(False)
else:
break

def _update_display(self, end_sim: Literal["natural", "interrupted"] | bool = False) -> None:
self._update_display(False)
harshil21 marked this conversation as resolved.
Show resolved Hide resolved

if self.end_sim_natural.is_set():
harshil21 marked this conversation as resolved.
Show resolved Hide resolved
self._update_display(self.NATURAL_END)
if self.end_sim_interrupted.is_set():
self._update_display(self.INTERRUPTED_END)
if not self._args.mock and self._airbrakes.state.name == "MotorBurnState":
self._update_display(self.STATE_END)

def _update_display(
harshil21 marked this conversation as resolved.
Show resolved Hide resolved
self, end_sim: Literal["natural", "interrupted", "state"] | bool = False
) -> None:
"""
Updates the display with real-time data.
:param end_sim: Whether the simulation ended or was interrupted.
Expand Down Expand Up @@ -181,15 +183,10 @@ def _update_display(self, end_sim: Literal["natural", "interrupted"] | bool = Fa
else:
time_since_launch = 0

if (
self._coast_time
and not self._convergence_time
and self._airbrakes.apogee_predictor.apogee
):
self._convergence_time = (
self._airbrakes.data_processor.current_timestamp - self._coast_time
) * 1e-9
if self._coast_time and not self._convergence_time and apogee_predictor.apogee:
self._convergence_time = (data_processor.current_timestamp - self._coast_time) * 1e-9
self._convergence_height = data_processor.current_altitude
self._apogee_at_convergence = apogee_predictor.apogee

# Prepare output
output = [
Expand All @@ -213,12 +210,13 @@ def _update_display(self, end_sim: Literal["natural", "interrupted"] | bool = Fa
output.extend(
[
f"{Y}{'=' * 18} DEBUG INFO {'=' * 17}{RESET}",
f"Convergence Time: {G}{self._convergence_time:<10.2f}{RESET} {R}s{RESET}", # noqa: E501
f"Convergence Height: {G}{self._convergence_height:<10.2f}{RESET} {R}m{RESET}", # noqa: E501
f"IMU Data Queue Size: {G}{current_queue_size:<10}{RESET} {R}packets{RESET}", # noqa: E501
f"Fetched packets: {G}{fetched_packets:<10}{RESET} {R}packets{RESET}",
f"Log buffer size: {G}{len(self._airbrakes.logger._log_buffer):<10}{RESET} {R}packets{RESET}", # noqa: E501
f"Invalid fields: {G}{invalid_fields}{G}{RESET}",
f"Convergence Time: {G}{self._convergence_time:<10.2f}{RESET} {R}s{RESET}", # noqa: E501
f"Convergence Height: {G}{self._convergence_height:<10.2f}{RESET} {R}m{RESET}", # noqa: E501
f"Predicted apogee at Convergence: {G}{self._apogee_at_convergence:<10.2f}{RESET} {R}m{RESET}", # noqa: E501
f"IMU Data Queue Size: {G}{current_queue_size:<10}{RESET} {R}packets{RESET}", # noqa: E501
f"Fetched packets: {G}{fetched_packets:<10}{RESET} {R}packets{RESET}", # noqa: E501
f"Log buffer size: {G}{len(self._airbrakes.logger._log_buffer):<10}{RESET} {R}packets{RESET}", # noqa: E501
f"Invalid fields: {G}{invalid_fields}{G}{RESET}",
f"{Y}{'=' * 13} REAL TIME CPU LOAD {'=' * 14}{RESET}",
]
)
Expand Down
3 changes: 3 additions & 0 deletions utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,7 @@ def arg_parser() -> argparse.Namespace:
"to run in simulation mode."
)

if args.verbose and args.debug:
parser.error("The `--verbose` and `--debug` options cannot be used together.")

return args