From 6046bf85f1efc9131388ec48ee39c8ee6ca0a63b Mon Sep 17 00:00:00 2001 From: semuadmin <28569967+semuadmin@users.noreply.github.com> Date: Wed, 8 May 2024 17:59:20 +0100 Subject: [PATCH] add proprietary msg definitions --- .vscode/settings.json | 2 +- RELEASE_NOTES.md | 6 + pyproject.toml | 2 +- src/pynmeagps/_version.py | 2 +- src/pynmeagps/nmeamessage.py | 7 +- src/pynmeagps/nmeatypes_core.py | 17 +- src/pynmeagps/nmeatypes_get.py | 88 +++++++++- tests/furuno_nmea.log | 2 + tests/kenwood_nmea.log | 5 + tests/test_stream.py | 292 +++++++++++++++++--------------- 10 files changed, 282 insertions(+), 141 deletions(-) create mode 100644 tests/furuno_nmea.log create mode 100644 tests/kenwood_nmea.log diff --git a/.vscode/settings.json b/.vscode/settings.json index 5bc3352..d8118e9 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -4,5 +4,5 @@ "editor.formatOnSave": true, "modulename": "${workspaceFolderBasename}", "distname": "${workspaceFolderBasename}", - "moduleversion": "1.0.35", + "moduleversion": "1.0.36", } \ No newline at end of file diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index b9df246..de7c81d 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,5 +1,11 @@ # pynmeagps Release Notes +### RELEASE 1.0.36 + +ENHANCEMENTS: + +1. Add further proprietary message definitions. + ### RELEASE 1.0.35 FIXES: diff --git a/pyproject.toml b/pyproject.toml index b2cdedc..b332710 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ name = "pynmeagps" authors = [{ name = "semuadmin", email = "semuadmin@semuconsulting.com" }] maintainers = [{ name = "semuadmin", email = "semuadmin@semuconsulting.com" }] description = "NMEA protocol parser and generator" -version = "1.0.35" +version = "1.0.36" license = { file = "LICENSE" } readme = "README.md" requires-python = ">=3.8" diff --git a/src/pynmeagps/_version.py b/src/pynmeagps/_version.py index a0724cd..d3f1fdf 100644 --- a/src/pynmeagps/_version.py +++ b/src/pynmeagps/_version.py @@ -8,4 +8,4 @@ :license: BSD 3-Clause """ -__version__ = "1.0.35" +__version__ = "1.0.36" diff --git a/src/pynmeagps/nmeamessage.py b/src/pynmeagps/nmeamessage.py index 7db3b7f..b8781ec 100644 --- a/src/pynmeagps/nmeamessage.py +++ b/src/pynmeagps/nmeamessage.py @@ -261,6 +261,7 @@ def _get_dict(self, **kwargs) -> dict: f"P{key} message definitions must " + "include payload or msgId keyword arguments." ) + key = key.upper() if self._mode == nmt.POLL: return nmp.NMEA_PAYLOADS_POLL[key] if self._mode == nmt.SET: @@ -363,12 +364,14 @@ def identity(self) -> str: :rtype: str """ + # pylint: disable=no-member + if ( - self.talker == "P" + self._talker == "P" and self._msgID in nmt.PROP_MSGIDS and hasattr(self, "msgId") ): - return self._talker + self._msgID + self.msgId # pylint: disable=no-member + return self._talker + self._msgID + self.msgId return self._talker + self._msgID @property diff --git a/src/pynmeagps/nmeatypes_core.py b/src/pynmeagps/nmeatypes_core.py index 2c0eae8..72fa612 100644 --- a/src/pynmeagps/nmeatypes_core.py +++ b/src/pynmeagps/nmeatypes_core.py @@ -24,7 +24,7 @@ ERR_LOG = 1 ERR_RAISE = 2 # proprietary messages where msgId is first element of payload: -PROP_MSGIDS = ("UBX", "TNL", "ASHR", "GPPADV") +PROP_MSGIDS = ("FEC", "UBX", "TNL", "ASHR", "GPPADV") GNSSLIST = { 0: "GPS", @@ -242,9 +242,17 @@ "GRMW": "Set Additional Waypoint Information", "GRMZ": "Altitude", # *************************************************************** - # JVCKenwood Proprietary message types + # JVC Kenwood Proprietary message types # *************************************************************** + "KLDS": "Position, Speed, Course", "KLSH": "FleetSync GNSS sentence", + "KNDS": "Position, Speed, Course", + "KNSH": "Position", + "KWDWPL": "Waypoint Location", + # *************************************************************** + # Magellan Proprietary message types + # *************************************************************** + "MGNWPL": "Waypoint Location", # *************************************************************** # U-BLOX Proprietary message types # *************************************************************** @@ -308,6 +316,11 @@ "TNLVGK": "Vector information", "TNLVHD": "Heading information", # *************************************************************** + # Furuno + # *************************************************************** + "FECGPATT": "Attitude yaw, pitch, roll", + "FECGPHVE": "Heave", + # *************************************************************** # Dummy message for testing only # *************************************************************** "FOO": "Dummy message", diff --git a/src/pynmeagps/nmeatypes_get.py b/src/pynmeagps/nmeatypes_get.py index 4681061..b92e691 100644 --- a/src/pynmeagps/nmeatypes_get.py +++ b/src/pynmeagps/nmeatypes_get.py @@ -402,8 +402,25 @@ "ltzn": ST, }, # ********************************************* - # JVCKENWOOD PROPRIETARY MESSAGES + # JVC KENWOOD PROPRIETARY MESSAGES # ********************************************* + "KLDS": { + "time": TM, + "status": CH, + "lat": LA, + "NS": LAD, + "lon": LN, + "EW": LND, + "sog": DE, + "cog": DE, + "dat": DT, + "declination": DE, + "dec_dir": CH, + "fleet": DE, + "senderid": ST, + "senderstatus": DE, + "reserved": DE, + }, "KLSH": { "lat": LA, "NS": LAD, @@ -414,6 +431,60 @@ "fleetId": ST, "deviceId": ST, }, + "KNDS": { + "time": TM, + "status": CH, + "lat": LA, + "NS": LAD, + "lon": LN, + "EW": LND, + "sog": DE, + "cog": DE, + "date": DT, + "declination": DE, + "dec_dir": CH, + "senderid": ST, + "senderstatus": DE, + "reserved": DE, + }, + "KNSH": { + "lat": LA, + "NS": LAD, + "lon": LN, + "EW": LND, + "time": TM, + "status": CH, + "senderid": ST, + }, + "KWDWPL": { + "time": TM, + "status": CH, + "lat": LA, + "NS": LAD, + "lon": LN, + "EW": LND, + "sog": DE, + "cog": DE, + "date": DT, + "alt": DE, + "wpt": ST, + "ts": ST, + }, + # ********************************************* + # MAGELLAN PROPRIETARY MESSAGES + # ********************************************* + "MGNWPL": { + "lat": LA, + "NS": LAD, + "lon": LN, + "EW": LND, + "alt": DE, + "alt_unit": CH, + "wpt": ST, + "comment": ST, + "icon": ST, + "type": ST, + }, # ********************************************* # GARMIN PROPRIETARY MESSAGES # ********************************************* @@ -1185,6 +1256,21 @@ "unit": CH, # 'M' }, # ********************************************* + # Furuno + # https://www.furuno.com/en/support/sdk/ + # ********************************************* + "FECGPATT": { + "msgId": ST, # 'GPatt' + "yaw": DE, + "pitch": DE, + "roll": DE, + }, + "FECGPHVE": { + "msgId": ST, # 'GPhve' + "heave": DE, + "status": CH, # 'A' + }, + # ********************************************* # Dummy message for error testing # ********************************************* "FOO": {"spam": "Z2", "eggs": "Y1"}, diff --git a/tests/furuno_nmea.log b/tests/furuno_nmea.log new file mode 100644 index 0000000..581df00 --- /dev/null +++ b/tests/furuno_nmea.log @@ -0,0 +1,2 @@ +$PFEC,GPatt,12.345,23.456,34.567,*50 +$PFEC,GPhve,12.345,A*0E diff --git a/tests/kenwood_nmea.log b/tests/kenwood_nmea.log new file mode 100644 index 0000000..729081c --- /dev/null +++ b/tests/kenwood_nmea.log @@ -0,0 +1,5 @@ +$PKWDWPL,150803,V,4237.14,N,07120.83,W,,,190316,,test,5,/'*30 +$PKLDS,001235,A,3544.6650,N,13940.1900,E,015.0,038.8,110498,10.80,W00,100,2000,15,00,*7E +$PKNDS,124640,A,4954.1458,N,11923.5992,W,000.0,000.0,120223,19.20,W00,U00002,207,00,*29 +$PKLSH,4000.0000,N,13500.0000,E,021720,A,100,2000,*2C +$PKNSH,4000.0000,N,13500.0000,E,021720,A,U00001,*55 diff --git a/tests/test_stream.py b/tests/test_stream.py index d442474..9aeaaa2 100644 --- a/tests/test_stream.py +++ b/tests/test_stream.py @@ -21,41 +21,15 @@ ERR_LOG, ) +DIRNAME = os.path.dirname(__file__) + class StreamTest(unittest.TestCase): def setUp(self): self.maxDiff = None - dirname = os.path.dirname(__file__) - self.streamNMEA2 = open(os.path.join(dirname, "pygpsdata-nmea2.log"), "rb") - self.streamNMEA4 = open(os.path.join(dirname, "pygpsdata-nmea4.log"), "rb") - self.streamMIXED = open(os.path.join(dirname, "pygpsdata-mixed.log"), "rb") - self.streamNMEA4SM = open(os.path.join(dirname, "pygpsdata-nmea4sm.log"), "rb") - self.streamBADEOF = open(os.path.join(dirname, "pygpsdata-badeof.log"), "rb") - self.streamNMEASTARTUP = open( - os.path.join(dirname, "pygpsdata-nmeastartup.log"), "rb" - ) - self.streamNMEAFOO1 = open( - os.path.join(dirname, "pygpsdata-nmeafoo1.log"), "rb" - ) - self.streamNMEAFOO2 = open( - os.path.join(dirname, "pygpsdata-nmeafoo2.log"), "rb" - ) - self.streamNMEABADCK = open( - os.path.join(dirname, "pygpsdata-nmeabadck2.log"), "rb" - ) - self.streamTRIMBLE = open(os.path.join(dirname, "trimble_nmea.log"), "rb") def tearDown(self): - self.streamNMEA2.close() - self.streamNMEA4.close() - self.streamMIXED.close() - self.streamNMEA4SM.close() - self.streamBADEOF.close() - self.streamNMEASTARTUP.close() - self.streamNMEAFOO1.close() - self.streamNMEAFOO2.close() - self.streamNMEABADCK.close() - self.streamTRIMBLE.close() + pass def catchio(self): """ @@ -92,12 +66,13 @@ def testNMEASTARTUP(self): # stream of NMEA device during start up (blank data) i = 0 raw = 0 - nmr = NMEAReader(self.streamNMEASTARTUP, nmeaonly=False, validate=1) - while raw is not None: - (raw, parsed) = nmr.read() - if raw is not None: - self.assertEqual(str(parsed), EXPECTED_RESULTS[i]) - i += 1 + with open(os.path.join(DIRNAME, "pygpsdata-nmeastartup.log"), "rb") as stream: + nmr = NMEAReader(stream, nmeaonly=False, validate=1) + while raw is not None: + (raw, parsed) = nmr.read() + if raw is not None: + self.assertEqual(str(parsed), EXPECTED_RESULTS[i]) + i += 1 self.assertEqual( i, 12 ) # if this fails, may be because log file terminators = LF rather than CRLF @@ -159,12 +134,13 @@ def testNMEA4( i = 0 raw = 0 - nmr = NMEAReader(self.streamNMEA4, nmeaonly=False, validate=3) - while raw is not None: - (raw, parsed) = nmr.read() - if raw is not None: - self.assertEqual(str(parsed), EXPECTED_RESULTS[i]) - i += 1 + with open(os.path.join(DIRNAME, "pygpsdata-nmea4.log"), "rb") as stream: + nmr = NMEAReader(stream, nmeaonly=False, validate=3) + while raw is not None: + (raw, parsed) = nmr.read() + if raw is not None: + self.assertEqual(str(parsed), EXPECTED_RESULTS[i]) + i += 1 self.assertEqual(i, 49) def testNMEA2(self): # stream of NMEA v2.30 device (u-blox M6N) @@ -190,12 +166,13 @@ def testNMEA2(self): # stream of NMEA v2.30 device (u-blox M6N) i = 0 raw = 0 - nmr = NMEAReader(self.streamNMEA2, nmeaonly=False) - while raw is not None: - (raw, parsed) = nmr.read() - if raw is not None: - self.assertEqual(str(parsed), EXPECTED_RESULTS[i]) - i += 1 + with open(os.path.join(DIRNAME, "pygpsdata-nmea2.log"), "rb") as stream: + nmr = NMEAReader(stream, nmeaonly=False) + while raw is not None: + (raw, parsed) = nmr.read() + if raw is not None: + self.assertEqual(str(parsed), EXPECTED_RESULTS[i]) + i += 1 self.assertEqual(i, 17) def testMIXED( @@ -221,26 +198,28 @@ def testMIXED( i = 0 raw = 0 - nmr = NMEAReader(self.streamMIXED, nmeaonly=False) - while raw is not None: - (raw, parsed) = nmr.read() - if raw is not None: - self.assertEqual(str(parsed), EXPECTED_RESULTS[i]) - i += 1 + with open(os.path.join(DIRNAME, "pygpsdata-mixed.log"), "rb") as stream: + nmr = NMEAReader(stream, nmeaonly=False) + while raw is not None: + (raw, parsed) = nmr.read() + if raw is not None: + self.assertEqual(str(parsed), EXPECTED_RESULTS[i]) + i += 1 self.assertEqual(i, 15) def testMIXED2( self, ): # stream of mixed NMEA & UBX data with nmea_only set to TRUE - should be rejected EXPECTED_ERROR = "Unknown data header b'$\\x11'" - with self.assertRaises(NMEAParseError) as context: - i = 0 - raw = 0 - nmr = NMEAReader(self.streamMIXED, nmeaonly=True, quitonerror=ERR_RAISE) - while raw is not None: - (raw, _) = nmr.read() - if raw is not None: - i += 1 + with open(os.path.join(DIRNAME, "pygpsdata-mixed.log"), "rb") as stream: + with self.assertRaises(NMEAParseError) as context: + i = 0 + raw = 0 + nmr = NMEAReader(stream, nmeaonly=True, quitonerror=ERR_RAISE) + while raw is not None: + (raw, _) = nmr.read() + if raw is not None: + i += 1 self.assertTrue(EXPECTED_ERROR in str(context.exception)) def testNMEAITER(self): # NMEAReader iterator @@ -257,11 +236,12 @@ def testNMEAITER(self): # NMEAReader iterator i = 0 raw = 0 - nmr = NMEAReader(self.streamNMEA4SM, nmeaonly=False) - for raw, parsed in nmr: - if raw is not None: - self.assertEqual(str(parsed), EXPECTED_RESULTS[i]) - i += 1 + with open(os.path.join(DIRNAME, "pygpsdata-nmea4sm.log"), "rb") as stream: + nmr = NMEAReader(stream, nmeaonly=False) + for raw, parsed in nmr: + if raw is not None: + self.assertEqual(str(parsed), EXPECTED_RESULTS[i]) + i += 1 self.assertEqual(i, 8) def testNMEAITERATE(self): # NMEAReader helper method @@ -278,90 +258,97 @@ def testNMEAITERATE(self): # NMEAReader helper method i = 0 raw = 0 - nmr = NMEAReader(self.streamNMEA4SM, nmeaonly=False) - for raw, parsed in nmr: - if raw is not None: - self.assertEqual(str(parsed), EXPECTED_RESULTS[i]) - i += 1 + with open(os.path.join(DIRNAME, "pygpsdata-nmea4sm.log"), "rb") as stream: + nmr = NMEAReader(stream, nmeaonly=False) + for raw, parsed in nmr: + if raw is not None: + self.assertEqual(str(parsed), EXPECTED_RESULTS[i]) + i += 1 self.assertEqual(i, 8) def testNMEAITERATE_ERR1( self, ): # NMEAReader iterator with bad checksum EXPECTED_ERROR = "Message GNVTG invalid checksum 3) - should be 30" - with self.assertRaises(NMEAParseError) as context: - nmr = NMEAReader( - self.streamNMEABADCK, - nmeaonly=False, - validate=VALCKSUM, - msgmode=0, - quitonerror=ERR_RAISE, - ) - for raw, parsed in nmr: - pass + with open(os.path.join(DIRNAME, "pygpsdata-nmeabadck2.log"), "rb") as stream: + with self.assertRaises(NMEAParseError) as context: + nmr = NMEAReader( + stream, + nmeaonly=False, + validate=VALCKSUM, + msgmode=0, + quitonerror=ERR_RAISE, + ) + for raw, parsed in nmr: + pass self.assertTrue(EXPECTED_ERROR in str(context.exception)) def testNMEAITERATE_ERR2( self, ): # NMEAReader iterator ignoring bad checksum and passing error handler EXPECTED_RESULT = "" - nmr = NMEAReader( - self.streamNMEABADCK, - nmeaonly=False, - validate=VALCKSUM, - msgmode=0, - quitonerror=ERR_LOG, - errorhandler=lambda e: print(f"I ignored the following error: {e}"), - ) - res = "" - for raw, parsed in nmr: - res = str(parsed) - self.assertEqual(EXPECTED_RESULT, res) + with open(os.path.join(DIRNAME, "pygpsdata-nmeabadck2.log"), "rb") as stream: + nmr = NMEAReader( + stream, + nmeaonly=False, + validate=VALCKSUM, + msgmode=0, + quitonerror=ERR_LOG, + errorhandler=lambda e: print(f"I ignored the following error: {e}"), + ) + res = "" + for raw, parsed in nmr: + res = str(parsed) + self.assertEqual(EXPECTED_RESULT, res) def testNMEAITERATE_ERR3( self, ): # NMEAReader iterator ignoring bad checksum and continuing EXPECTED_RESULT = "" - nmr = NMEAReader( - self.streamNMEABADCK, - nmeaonly=False, - validate=VALCKSUM, - msgmode=0, - quitonerror=ERR_IGNORE, - ) - res = "" - for raw, parsed in nmr: - res = str(parsed) - self.assertEqual(EXPECTED_RESULT, res) - - def testNMEAFOO1(self): # stream containing invalid attribute type - EXPECTED_ERROR = "Unknown attribute type Z2" - with self.assertRaises(NMEAParseError) as context: - i = 0 - raw = 0 + with open(os.path.join(DIRNAME, "pygpsdata-nmeabadck2.log"), "rb") as stream: nmr = NMEAReader( - self.streamNMEAFOO1, + stream, nmeaonly=False, - quitonerror=ERR_RAISE, + validate=VALCKSUM, + msgmode=0, + quitonerror=ERR_IGNORE, ) + res = "" for raw, parsed in nmr: - i += 1 + res = str(parsed) + self.assertEqual(EXPECTED_RESULT, res) + + def testNMEAFOO1(self): # stream containing invalid attribute type + EXPECTED_ERROR = "Unknown attribute type Z2" + with open(os.path.join(DIRNAME, "pygpsdata-nmeafoo1.log"), "rb") as stream: + with self.assertRaises(NMEAParseError) as context: + i = 0 + raw = 0 + nmr = NMEAReader( + stream, + nmeaonly=False, + quitonerror=ERR_RAISE, + ) + for raw, parsed in nmr: + i += 1 self.assertTrue(EXPECTED_ERROR in str(context.exception)) def testNMEAFOO2(self): # stream containing invalid value for attribute type EXPECTED_ERROR = "Incorrect type for attribute spd in msgID RMC" - with self.assertRaises(NMEAParseError) as context: - i = 0 - raw = 0 - nmr = NMEAReader(self.streamNMEAFOO2, nmeaonly=False, quitonerror=ERR_RAISE) - for raw, parsed in nmr: - i += 1 + with open(os.path.join(DIRNAME, "pygpsdata-nmeafoo2.log"), "rb") as stream: + with self.assertRaises(NMEAParseError) as context: + i = 0 + raw = 0 + nmr = NMEAReader(stream, nmeaonly=False, quitonerror=ERR_RAISE) + for raw, parsed in nmr: + i += 1 self.assertTrue(EXPECTED_ERROR in str(context.exception)) def testNMEABADMODE(self): # invalid stream mode EXPECTED_ERROR = "Invalid stream mode 4 - must be 0, 1 or 2." - with self.assertRaises(NMEAParseError) as context: - NMEAReader(self.streamNMEAFOO1, nmeaonly=False, validate=1, msgmode=4) + with open(os.path.join(DIRNAME, "pygpsdata-nmeafoo1.log"), "rb") as stream: + with self.assertRaises(NMEAParseError) as context: + NMEAReader(stream, nmeaonly=False, validate=1, msgmode=4) self.assertTrue(EXPECTED_ERROR in str(context.exception)) def testBADEOF(self): # stream with premature EOF - should just be tolerated @@ -374,12 +361,13 @@ def testBADEOF(self): # stream with premature EOF - should just be tolerated i = 0 raw = 0 - nmr = NMEAReader(self.streamBADEOF, nmeaonly=False) - for raw, parsed in nmr: - if raw is not None: - # print(parsed) - self.assertEqual(str(parsed), EXPECTED_RESULTS[i]) - i += 1 + with open(os.path.join(DIRNAME, "pygpsdata-badeof.log"), "rb") as stream: + nmr = NMEAReader(stream, nmeaonly=False) + for raw, parsed in nmr: + if raw is not None: + # print(parsed) + self.assertEqual(str(parsed), EXPECTED_RESULTS[i]) + i += 1 def testNMEATRIMBLE(self): # test proprietary Trimble messages EXPECTED_RESULTS = ( @@ -408,14 +396,52 @@ def testNMEATRIMBLE(self): # test proprietary Trimble messages i = 0 raw = 0 - nmr = NMEAReader(self.streamTRIMBLE, nmeaonly=False) - for raw, parsed in nmr: - if raw is not None: - # print(parsed) - self.assertEqual(str(parsed), EXPECTED_RESULTS[i]) - i += 1 + with open(os.path.join(DIRNAME, "trimble_nmea.log"), "rb") as stream: + nmr = NMEAReader(stream, nmeaonly=False) + for raw, parsed in nmr: + if raw is not None: + # print(parsed) + self.assertEqual(str(parsed), EXPECTED_RESULTS[i]) + i += 1 self.assertEqual(i, 21) + def testNMEAFURUNO(self): # test proprietary Furuno messages + EXPECTED_RESULTS = ( + "", + "", + ) + + i = 0 + raw = 0 + with open(os.path.join(DIRNAME, "furuno_nmea.log"), "rb") as stream: + nmr = NMEAReader(stream, nmeaonly=False, quitonerror=2) + for raw, parsed in nmr: + if raw is not None: + # print(f'"{parsed}",') + self.assertEqual(str(parsed), EXPECTED_RESULTS[i]) + i += 1 + self.assertEqual(i, 2) + + def testNMEAKENWOOD(self): # test proprietary Kenwood messages + EXPECTED_RESULTS = ( + "", + "", + "", + "", + "", + ) + + i = 0 + raw = 0 + with open(os.path.join(DIRNAME, "kenwood_nmea.log"), "rb") as stream: + nmr = NMEAReader(stream, nmeaonly=False, quitonerror=2) + for raw, parsed in nmr: + if raw is not None: + # print(f'"{parsed}",') + self.assertEqual(str(parsed), EXPECTED_RESULTS[i]) + i += 1 + self.assertEqual(i, 5) + if __name__ == "__main__": # import sys;sys.argv = ['', 'Test.testName']