From bb71d57ab00570239088b3c4c2f2d4dfb15bfe85 Mon Sep 17 00:00:00 2001 From: Kihomenethoth Date: Wed, 14 Jul 2021 12:46:07 -0400 Subject: [PATCH] Sewio updates (#233) * adding hardware option to killerbee * debugging goodfet read * fixed up mypy * add deprecation notice * added changes from PR 148 * fixed indents * changed built-in list types to typing.List * fixed zbstumbler byte type mismatches * rely on capabilities to throw errors, changed other exceptions to TypeError * added back accidentally removed tests for jamming method not supported Co-authored-by: Taylor Co-authored-by: Taylor Co-authored-by: Taylor --- killerbee/__init__.py | 42 ++++---- killerbee/dev_apimote.py | 25 +++-- killerbee/dev_cc253x.py | 4 +- killerbee/dev_freakduino.py | 4 +- killerbee/dev_sewio.py | 134 +++++++++++++++++++++----- killerbee/dev_telosb.py | 17 +++- killerbee/dev_zigduino.py | 165 ++++++++++++++++---------------- setup.py | 2 +- tests/fixtures/test_dt_dump.dcf | 2 + tests/test_apimote_driver.py | 13 ++- tests/test_killerbee_core.py | 11 ++- 11 files changed, 265 insertions(+), 154 deletions(-) create mode 100644 tests/fixtures/test_dt_dump.dcf diff --git a/killerbee/__init__.py b/killerbee/__init__.py index c3d7eaa..204683b 100644 --- a/killerbee/__init__.py +++ b/killerbee/__init__.py @@ -31,7 +31,7 @@ def show_dev(vendor: str=None, product: str=None, gps: str=None, include: str=No @param include: Provide device names in this argument if you would like only these to be enumerated. Aka, include only these items. ''' - fmt: str = "{: >14} {: <20} {: >10}" + fmt: str = "{: >14} {: <30} {: >10}" print((fmt.format("Dev", "Product String", "Serial Number"))) for dev in devlist(vendor=vendor, product=product, gps=gps, include=include): # Using None as a format value is an TypeError in python3 @@ -76,42 +76,39 @@ def __init__(self, device: Optional[str]=None, hardware: Optional[str]=None, dat else: del isSewio - if device is not None: - self.dev = device - if hardware is None: print("Auto-detection is being deprecated - Please specify hardware") - elif self.dev is not None: + elif device is not None: if hardware == "apimote": from .dev_apimote import APIMOTE - self.driver = APIMOTE(self.dev) + self.driver = APIMOTE(device) elif hardware == "rzusbstick": from .dev_rzusbstick import RZUSBSTICK - self.driver = RZUSBSTICK(self.dev, self.__bus) + self.driver = RZUSBSTICK(device, self.__bus) elif hardware == "cc2530": from .dev_cc253x import CC253x - self.driver = CC253x(self.dev, self.__bus, CC253x.VARIANT_CC2530) + self.driver = CC253x(device, self.__bus, CC253x.VARIANT_CC2530) elif hardware == "cc2531": from .dev_cc253x import CC253x - self.driver = CC253x(self.dev, self.__bus, CC253x.VARIANT_CC2531) + self.driver = CC253x(device, self.__bus, CC253x.VARIANT_CC2531) elif hardware == "bumblebee": from .dev_bumblebee import Bumblebee - self.driver = Bumblebee(self.dev, self.__bus) + self.driver = Bumblebee(device, self.__bus) elif hardware == "sl_nodetest": from .dev_sl_nodetest import SL_NODETEST - self.driver = SL_NODETEST(self.dev) + self.driver = SL_NODETEST(device) elif hardware == "sl_beehive": from .dev_sl_beehive import SL_BEEHIVE - self.driver = SL_BEEHIVE(self.dev) + self.driver = SL_BEEHIVE(device) elif hardware == "zigduino": from .dev_zigduino import ZIGDUINO - self.driver = ZIGDUINO(self.dev) + self.driver = ZIGDUINO(device) elif hardware == "freakdruino": from .dev_freakduino import FREAKDUINO - self.driver = FREAKDUINO(self.dev) + self.driver = FREAKDUINO(device) elif hardware == "telosb": from .dev_telosb import TELOSB - self.driver = TELOSB(self.dev) + self.driver = TELOSB(device) # Figure out a device is one is not set, trying USB devices next if self.driver is None: @@ -208,8 +205,10 @@ def __device_is(self, vendorId, productId): @rtype: Boolean @return: True if KillerBee class has device matching the vendor and product IDs provided. ''' - if self.dev.idVendor == vendorId and self.dev.idProduct == productId: return True - else: return False + if self.dev.idVendor == vendorId and self.dev.idProduct == productId: + return True + else: + return False def get_dev_info(self) -> List[str]: ''' @@ -442,7 +441,7 @@ def pnext(self, timeout: int=100) -> Optional[Dict[Union[int, str], Any]]: return self.driver.pnext(timeout) - def jammer_on(self, channel: Optional[int]=None): + def jammer_on(self, channel: Optional[int]=None, method: Optional[str]=None): ''' Attempts reflexive jamming on all 802.15.4 frames. Targeted frames must be >12 bytes for reliable jamming in current firmware. @@ -454,9 +453,10 @@ def jammer_on(self, channel: Optional[int]=None): if self.driver is None: raise KBInterfaceError("Driver not configured") - return self.driver.jammer_on(channel=channel) + return self.driver.jammer_on(channel=channel, method=method) + - def jammer_off(self, channel: Optional[int]=None): + def jammer_off(self): ''' End reflexive jamming on all 802.15.4 frames. Targeted frames must be >12 bytes for reliable jamming in current firmware. @@ -468,5 +468,5 @@ def jammer_off(self, channel: Optional[int]=None): if self.driver is None: raise KBInterfaceError("Driver not configured") - return self.driver.jammer_off(channel=channel) + return self.driver.jammer_off() diff --git a/killerbee/dev_apimote.py b/killerbee/dev_apimote.py index fa9d4f6..7101d66 100755 --- a/killerbee/dev_apimote.py +++ b/killerbee/dev_apimote.py @@ -79,6 +79,7 @@ def __set_capabilities(self) -> None: self.capabilities.setcapab(KBCapabilities.INJECT, True) self.capabilities.setcapab(KBCapabilities.PHYJAM_REFLEX, True) self.capabilities.setcapab(KBCapabilities.SET_SYNC, True) + self.capabilities.setcapab(KBCapabilities.PHYJAM, True) return # KillerBee expects the driver to implement this function @@ -259,19 +260,28 @@ def ping(self, da: Any, panid: Any, sa: Any, channel: Optional[int]=None, page: ''' raise Exception('Not yet implemented') - def jammer_on(self, channel: Optional[int]=None, page: int=0) -> None: + def jammer_on(self, channel: Optional[int]=None, page: int=0, method: Optional[str]=None) -> None: ''' - Not yet implemented. + Implements reflexive jamming or constant carrier wave jamming. @type channel: Integer @param channel: Sets the channel, optional @type page: Integer @param page: Sets the subghz page, not supported on this device @rtype: None ''' + if self.handle is None: raise Exception("Handle does not exist") - self.capabilities.require(KBCapabilities.PHYJAM_REFLEX) + if method is None: + method = "constant" + + if method == "reflexive": + self.capabilities.require(KBCapabilities.PHYJAM_REFLEX) + elif method == "constant": + self.capabilities.require(KBCapabilities.PHYJAM) + else: + raise ValueError('Parameter "method" must be either \'reflexive\' or \'constant\'.') self.handle.RF_promiscuity(1) self.handle.RF_autocrc(0) @@ -280,8 +290,11 @@ def jammer_on(self, channel: Optional[int]=None, page: int=0) -> None: self.set_channel(channel, page) self.handle.CC_RFST_RX() - self.handle.RF_carrier() #constant carrier wave jamming - #self.handle.RF_reflexjam() #reflexive jamming (advanced) + + if method == "reflexive": + self.handle.RF_reflexjam() + elif method == 'constant': + self.handle.RF_carrier() #TODO maybe move sync to byte string rather than int def set_sync(self, sync: int=0xA70F) -> Any: @@ -294,7 +307,7 @@ def set_sync(self, sync: int=0xA70F) -> Any: raise Exception("Sync word (%x) must be 2-bytes or less." % sync) return self.handle.poke(CC2420_REG_SYNC, sync) - def jammer_off(self, channel: Optional[int]=None, page: int=0) -> None: + def jammer_off(self) -> None: ''' Not yet implemented. @return: None diff --git a/killerbee/dev_cc253x.py b/killerbee/dev_cc253x.py index e2aef96..276c8c4 100644 --- a/killerbee/dev_cc253x.py +++ b/killerbee/dev_cc253x.py @@ -272,7 +272,7 @@ def pnext(self, timeout=100): return ret - def jammer_on(self, channel=None, page=0): + def jammer_on(self, channel=None, page=0, method=None): """ Not yet implemented. @type channel: Integer @@ -289,7 +289,7 @@ def set_sync(self, sync=0xA7): """ raise Exception('Not yet implemented') - def jammer_off(self, channel=None, page=0): + def jammer_off(self): """ Not yet implemented. @return: None diff --git a/killerbee/dev_freakduino.py b/killerbee/dev_freakduino.py index 28d4e5d..1c8ceda 100644 --- a/killerbee/dev_freakduino.py +++ b/killerbee/dev_freakduino.py @@ -313,7 +313,7 @@ def ping(self, da, panid, sa, channel=None, page=0): ''' raise Exception('Not yet implemented') - def jammer_on(self, channel=None, page=0): + def jammer_on(self, channel=None, page=0, method=None): ''' Not yet implemented. @type channel: Integer @@ -333,7 +333,7 @@ def jammer_on(self, channel=None, page=0): #TODO implement raise Exception('Not yet implemented') - def jammer_off(self, channel=None, page=0): + def jammer_off(self): ''' Not yet implemented. @return: None diff --git a/killerbee/dev_sewio.py b/killerbee/dev_sewio.py index b6e6113..ad45dfb 100755 --- a/killerbee/dev_sewio.py +++ b/killerbee/dev_sewio.py @@ -6,7 +6,7 @@ # (C) 2013 Ryan Speers # # For documentation from the vendor, visit: -# http://www.sniffer.wislab.cz/sniffer-configuration/ +# https://www.sewio.net/open-sniffer/ # import os # type: ignore @@ -27,7 +27,7 @@ DEFAULT_IP = "10.10.10.2" #IP address of the sniffer DEFAULT_GW = "10.10.10.1" #IP address of the default gateway DEFAULT_UDP = 17754 #"Remote UDP Port" -TESTED_FW_VERS = ["0.5"] #Firmware versions tested with the current version of this client device connector +TESTED_FW_VERS = ["0.5", "0.9"] #Firmware versions tested with the current version of this client device connector NTP_DELTA = 70*365*24*60*60 #datetime(1970, 1, 1, 0, 0, 0) - datetime(1900, 1, 1, 0, 0, 0) @@ -53,12 +53,29 @@ def ntp_to_system_time(secs, msecs): def getFirmwareVersion(ip): try: - html = urllib.request.urlopen("http://{0}/".format(ip)) fw = re.search(r'Firmware version ([0-9.]+)', html.read()) + #TODO: Have timeout handled sooner + html = urllib2.urlopen("http://{0}/".format(ip), timeout=1) + data = html.read() + # First try for the "old" web UI parsing: + fw = re.search(r'Firmware version ([0-9.]+)', data) if fw is not None: return fw.group(1) + else: + # Detect using the new method, credit to the opensniffer python code + # Find index of slave IP address + index = data.find(ip) + # Find index of parenthesis + indexEnd = data.find(')', index) - 1 + # Find index of comma before parenthesis + indexBeg = data[:indexEnd].rindex(',') + 1 + # Parse FW version out + fw = data[indexBeg:indexEnd] + if fw is not None: + return fw + except Exception as e: - print(("Unable to connect to IP {0} (error: {1}).".format(ip, e))) + print("Unable to connect to IP {0} (error: {1}).".format(ip, e)) return None def getMacAddr(ip): @@ -111,7 +128,12 @@ def __init__(self, dev=DEFAULT_IP, recvport=DEFAULT_UDP, recvip=DEFAULT_GW): self.handle = socket(AF_INET, SOCK_DGRAM) self.handle.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) - self.handle.bind((self.udp_recv_ip, self.udp_recv_port)) + try: + self.handle.bind((self.udp_recv_ip, self.udp_recv_port)) + except Exception as e: + print(e) + print("ERROR: Attempted to bind on UDP {}:{}, but failed.".format(self.udp_recv_ip, self.udp_recv_port)) + print("ERROR: Is that a correct local IP in your environment? Is the port free?") self.__stream_open = False self.capabilities = KBCapabilities() @@ -137,6 +159,10 @@ def __set_capabilities(self): self.capabilities.setcapab(KBCapabilities.SETCHAN, True) self.capabilities.setcapab(KBCapabilities.FREQ_2400, True) self.capabilities.setcapab(KBCapabilities.FREQ_900, True) + if ( self.__revision_num == "0.9.0" ): + self.capabilities.setcapab(KBCapabilities.INJECT, True) + self.capabilities.setcapab(KBCapabilities.PHYJAM, True) + return # KillerBee expects the driver to implement this function @@ -177,7 +203,7 @@ def __sniffer_status(self): if res is None: raise KBInterfaceError("Unable to parse the sniffer's current status.") # RUNNING means it's sniffing, STOPPED means it's not. - return (res.group(1) == "RUNNING") + return res.group(1) == "RUNNING" def __sync_status(self): ''' @@ -251,14 +277,14 @@ def sniffer_off(self): @staticmethod def __get_default_modulation(channel, page=0): ''' - Return the Sewio-specific integer representing the modulation which - should be choosen to be IEEE 802.15.4 complinating for a given channel + Return the Sewio-specific string value representing the modulation which + should be chosen to be IEEE 802.15.4 complinating for a given channel number. Captured values from sniffing Sewio web interface, unsure why these - are done as such. + are encoded in this way. Available modulations are listed at: http://www.sewio.net/open-sniffer/develop/http-rest-interface/ - @rtype: Integer, or None if unable to determine modulation + @rtype: String, or None if unable to determine modulation ''' if channel >= 11 or channel <= 26: return '0' #O-QPSK 250 kb/s 2.4GHz elif channel >= 1 or channel <= 10: return 'c' #O-QPSK 250 kb/s 915MHz @@ -300,10 +326,37 @@ def set_channel(self, channel, page=0): # KillerBee expects the driver to implement this function def inject(self, packet, channel=None, count=1, delay=0, page=0): ''' - Not implemented. + Injects the specified packet contents. + @type packet: String + @param packet: Packet contents to transmit, without FCS. + @type channel: Integer + @param channel: Sets the channel, optional + @type count: Integer + @param count: Transmits a specified number of frames, def=1 + @type delay: Float + @param delay: Delay between each frame, def=0 + @rtype: None ''' + self.capabilities.require(KBCapabilities.INJECT) + if len(packet) < 1: + raise ValueError('Empty packet') + if len(packet) > 125: # 127 -2 to accommodate FCS + raise ValueError('Packet too long') + + if channel != None: + self.set_channel(channel) + + packet_length = len(packet) + packet = packet.encode('latin-1') + + self.__make_rest_call( + "inject.cgi?chn={0}&modul=0&txlevel=0&rxen=1&nrepeat={1}&tspace={2}&autocrc=1&spayload={3}&len={4}".format( + self._channel, count, delay, packet, packet_length + ) + ) + @staticmethod def __parse_zep_v2(data): ''' @@ -395,10 +448,9 @@ def pnext(self, timeout=100): continue # Dissect the UDP packet (frame, ch, validcrc, rssi, lqival, recdtime) = self.__parse_zep_v2(data) - print("Valid CRC", validcrc, "LQI", lqival, "RSSI", rssi) - if frame == None or (ch is not None and ch != self._channel): - #TODO this maybe should be an error condition, instead of ignored? - print(("ZEP parsing issue (bytes length={0}, channel={1}).".format(len(frame) if frame is not None else None, ch))) + if frame == None: # or (ch is not None and ch != self._channel): + #TODO: Sort out situations for error vs warning + print("WARN: ZEP parsing issue (bytes length={0}, channel={1}).".format(len(frame) if frame is not None else None, ch)) continue break @@ -409,27 +461,59 @@ def pnext(self, timeout=100): #Note that 0,1,2 indicies inserted twice for backwards compatibility. result = {0:frame, 1:validcrc, 2:rssi, 'bytes':frame, 'validcrc':validcrc, 'rssi':rssi, 'dbm':None, 'location':None, 'datetime':recdtime} if rssi is not None: - # Per note from Sewino team regarding encoding of RSSI value as 2's complement dBm values + # Per note from Sewio team regarding encoding of RSSI value as 2's complement dBm values if rssi > 127: result['dbm'] = rssi - 256 else: result['dbm'] = rssi return result - def jammer_on(self, channel=None, page=0): + def jammer_on(self, channel=None, page=0, method=None): ''' - Not yet implemented. + Transmit a constant jamming signal following the given mode. @type channel: Integer @param channel: Sets the channel, optional - @type page: Integer - @param page: Sets the subghz page, not supported on this device + @type method: String + @param method: One of the mode values supported by the device: + "1" - PRBS: AAAA... + "2" - PRBS: 0000... + "3" - PRBS: FFFF... + "4" - Fc - 0.5 MHz + "5" - Fc + 0.5 MHz @rtype: None ''' self.capabilities.require(KBCapabilities.PHYJAM) - - def jammer_off(self, channel=None, page=0): - ''' - Not yet implemented. - @return: None + if method is not None and method not in ["1","2","3","4","5"]: + raise ValueError("Jamming method is unsupported by this driver.") + elif method is None: + method = "1" #default to 1 + + if channel != None: + self.set_channel(channel) + + # Parameter enumeration + # http://10.10.10.2/test.cgi?chn=15&mode=1&module=0&txlevel=0 + # + # txlevel + # 0 - 3.0 dBm + # 2 - 2.3 dBm + # 4 - 1.3 dBm + # 6 - 0.0 dBm + # 9 - -3.0 dBm + # c - -7.0 dBm + # f - -17.0 dBm + + if not self.__make_rest_call("test.cgi?chn={0}&mode={1}&module=0&txlevel=0".format(self._channel, method), fetch=False): + raise KBInterfaceError("Error instructing sniffer to start jamming.") + + def jammer_off(self): + ''' + Instruct the device to stop jamming. + @type channel: Integer + @param channel: Sets the channel, optional @rtype: None ''' self.capabilities.require(KBCapabilities.PHYJAM) + + if not self.__make_rest_call("status.cgi?p=4"): + raise KBInterfaceError("Error instructing sniffer to stop jamming.") + diff --git a/killerbee/dev_telosb.py b/killerbee/dev_telosb.py index 77b8c4e..a42fef2 100644 --- a/killerbee/dev_telosb.py +++ b/killerbee/dev_telosb.py @@ -199,21 +199,28 @@ def ping(self, da, panid, sa, channel=None, page=0): ''' raise Exception('Not yet implemented') - def jammer_on(self, channel=None, page=0): + def jammer_on(self, channel=None, page=0, method=None): ''' - Not yet implemented. + Implements reflexive jamming. + Targeted frames must be >12 bytes for reliable jamming in current firmware. @type channel: Integer @param channel: Sets the channel, optional @type page: Integer @param page: Sets the subghz page, not support on this device @rtype: None ''' + + if channel is not None: + self.set_channel(channel, page) + + if method is None or method not in ["reflexive"]: + raise ValueError("Jamming method is unsupported by this driver.") + self.capabilities.require(KBCapabilities.PHYJAM_REFLEX) self.handle.RF_promiscuity(1) self.handle.RF_autocrc(0) - if channel != None: - self.set_channel(channel, page) + self.handle.CC_RFST_RX() self.handle.RF_reflexjam() @@ -224,7 +231,7 @@ def set_sync(self, sync=0xA70F): raise Exception("Sync word (%x) must be 2-bytes or less." % sync) return self.handle.poke(CC2420_REG_SYNC, sync) - def jammer_off(self, channel=None, page=0): + def jammer_off(self): ''' Not yet implemented. @return: None diff --git a/killerbee/dev_zigduino.py b/killerbee/dev_zigduino.py index 8b31f62..dc7e7fb 100644 --- a/killerbee/dev_zigduino.py +++ b/killerbee/dev_zigduino.py @@ -21,34 +21,33 @@ class ZIGDUINO: def __init__(self, dev): - ''' - Instantiates the KillerBee class for Zigduino running GoodFET firmware. - @type dev: String - @param dev: Serial device identifier (ex /dev/ttyUSB0) - @return: None - @rtype: None - ''' - self._channel = None - self._page = 0 - self.handle = None - self.dev = dev - self.handle = GoodFETatmel128rfa1() - self.handle.serInit(port=self.dev) - self.handle.setup() - - self.__stream_open = False - self.capabilities = KBCapabilities() - self.__set_capabilities() + ''' + Instantiates the KillerBee class for Zigduino running GoodFET firmware. + @type dev: String + @param dev: Serial device identifier (ex /dev/ttyUSB0) + @return: None + @rtype: None + ''' + self._channel = None + self.handle = None + self.dev = dev + self.handle = GoodFETatmel128rfa1() + self.handle.serInit(port=self.dev) + self.handle.setup() + + self.__stream_open = False + self.capabilities = KBCapabilities() + self.__set_capabilities() def close(self): - self.handle.serClose() - self.handle = None + self.handle.serClose() + self.handle = None def check_capability(self, capab): - return self.capabilities.check(capab) + return self.capabilities.check(capab) def get_capabilities(self): - return self.capabilities.getlist() + return self.capabilities.getlist() def __set_capabilities(self): ''' @@ -65,60 +64,60 @@ def __set_capabilities(self): # KillerBee expects the driver to implement this function def get_dev_info(self): - ''' - Returns device information in a list identifying the device. - @rtype: List - @return: List of 3 strings identifying device. - ''' - return [self.dev, "Zigduino", ""] + ''' + Returns device information in a list identifying the device. + @rtype: List + @return: List of 3 strings identifying device. + ''' + return [self.dev, "Zigduino", ""] # KillerBee expects the driver to implement this function def sniffer_on(self, channel=None, page=0): - ''' - Turns the sniffer on such that pnext() will start returning observed - data. Will set the command mode to Air Capture if it is not already - set. - @type channel: Integer - @param channel: Sets the channel, optional - @type page: Integer - @param page: Sets the subghz page, optional - @rtype: None - ''' - self.capabilities.require(KBCapabilities.SNIFF) - - if channel != None or page: - self.set_channel(channel, page) - - self.__stream_open = True + ''' + Turns the sniffer on such that pnext() will start returning observed + data. Will set the command mode to Air Capture if it is not already + set. + @type channel: Integer + @param channel: Sets the channel, optional + @type page: Integer + @param page: Sets the subghz page, optional + @rtype: None + ''' + self.capabilities.require(KBCapabilities.SNIFF) + + if channel != None or page: + self.set_channel(channel, page) + + self.__stream_open = True # KillerBee expects the driver to implement this function def sniffer_off(self): - ''' - Turns the sniffer off, freeing the hardware for other functions. It is - not necessary to call this function before closing the interface with - close(). - @rtype: None - ''' - #TODO actually have firmware stop sending us packets! - self.__stream_open = False + ''' + Turns the sniffer off, freeing the hardware for other functions. It is + not necessary to call this function before closing the interface with + close(). + @rtype: None + ''' + #TODO actually have firmware stop sending us packets! + self.__stream_open = False # KillerBee expects the driver to implement this function def set_channel(self, channel, page=0): - ''' - Sets the radio interface to the specifid channel (limited to 2.4 GHz channels 11-26) - @type channel: Integer - @param channel: Sets the channel - @type page: Integer - @param page: Sets the subghz page, not supported on this device - @rtype: None - ''' - self.capabilities.require(KBCapabilities.SETCHAN) - - if channel >= 11 or channel <= 26: - self._channel = channel - self.handle.RF_setchannel(channel) - else: - raise Exception('Invalid channel') + ''' + Sets the radio interface to the specifid channel (limited to 2.4 GHz channels 11-26) + @type channel: Integer + @param channel: Sets the channel + @type page: Integer + @param page: Sets the subghz page, not supported on this device + @rtype: None + ''' + self.capabilities.require(KBCapabilities.SETCHAN) + + if channel >= 11 or channel <= 26: + self._channel = channel + self.handle.RF_setchannel(channel) + else: + raise Exception('Invalid channel') # KillerBee expects the driver to implement this function def inject(self, packet, channel=None, count=1, delay=0, page=0): @@ -184,15 +183,15 @@ def pnext(self, timeout=100): return result def jammer_on(self, channel=None, page=0): - ''' - Not yet implemented. - @type channel: Integer - @param channel: Sets the channel, optional - @type page: Integer - @param page: Sets the subghz page, not supported on this device - @rtype: None - ''' - raise Exception('Not yet implemented') + ''' + Not yet implemented. + @type channel: Integer + @param channel: Sets the channel, optional + @type page: Integer + @param page: Sets the subghz page, not supported on this device + @rtype: None + ''' + raise Exception('Not yet implemented') def set_sync(self, sync=0xA7): '''Set the register controlling the 802.15.4 PHY sync byte.''' @@ -204,11 +203,11 @@ def set_sync(self, sync=0xA7): return self.handle.poke(ATMEL_REG_SYNC, sync) def jammer_off(self, channel=None, page=0): - ''' - Not yet implemented. - @return: None - @rtype: None - ''' - #TODO implement - raise Exception('Not yet implemented') + ''' + Not yet implemented. + @return: None + @rtype: None + ''' + #TODO implement + raise Exception('Not yet implemented') diff --git a/setup.py b/setup.py index 6e89582..bbe8726 100644 --- a/setup.py +++ b/setup.py @@ -67,7 +67,7 @@ ) setup(name = 'killerbee', - version = '3.0.0-beta.1', + version = '3.0.0-beta.2', description = 'ZigBee and IEEE 802.15.4 Attack Framework and Tools', author = 'Joshua Wright, Ryan Speers', author_email = 'jwright@willhackforsushi.com, ryan@riverloopsecurity.com', diff --git a/tests/fixtures/test_dt_dump.dcf b/tests/fixtures/test_dt_dump.dcf new file mode 100644 index 0000000..3a28346 --- /dev/null +++ b/tests/fixtures/test_dt_dump.dcf @@ -0,0 +1,2 @@ +#Format=4 +# SNA v3.0.0.7 SUS:2021714 ACT:067341 diff --git a/tests/test_apimote_driver.py b/tests/test_apimote_driver.py index 1ff6145..1ca6588 100644 --- a/tests/test_apimote_driver.py +++ b/tests/test_apimote_driver.py @@ -34,7 +34,7 @@ def test_get_capabilities(self): self.assertTrue(capabilities[KBCapabilities.SET_SYNC]) self.assertFalse(capabilities[KBCapabilities.NONE]) self.assertFalse(capabilities[KBCapabilities.SELFACK]) - self.assertFalse(capabilities[KBCapabilities.PHYJAM]) + self.assertTrue(capabilities[KBCapabilities.PHYJAM]) self.assertFalse(capabilities[KBCapabilities.FREQ_900]) self.assertFalse(capabilities[KBCapabilities.FREQ_863]) self.assertFalse(capabilities[KBCapabilities.FREQ_868]) @@ -65,7 +65,7 @@ def test_check_capabilities(self): self.assertTrue(driver.check_capability(KBCapabilities.SET_SYNC)) self.assertFalse(driver.check_capability(KBCapabilities.NONE)) self.assertFalse(driver.check_capability(KBCapabilities.SELFACK)) - self.assertFalse(driver.check_capability(KBCapabilities.PHYJAM)) + self.assertTrue(driver.check_capability(KBCapabilities.PHYJAM)) self.assertFalse(driver.check_capability(KBCapabilities.FREQ_900)) self.assertFalse(driver.check_capability(KBCapabilities.FREQ_863)) self.assertFalse(driver.check_capability(KBCapabilities.FREQ_868)) @@ -116,7 +116,8 @@ def test_inject(self): def test_pnext(self): driver=APIMOTE(os.environ['APIMOTE_DEVSTRING']) - + + driver.set_channel(25) driver.pnext() self.assertTrue(True) @@ -129,8 +130,10 @@ def test_ping(self): def test_jammer_on_off(self): driver=APIMOTE(os.environ['APIMOTE_DEVSTRING']) - driver.jammer_on() - + #driver.jammer_on(None, 0, 'reflexive') + #driver.jammer_on(None, 0, 'constant') + #driver.jammer_on(11, 0, 'constant') + self.assertRaises(Exception, driver.jammer_on, None, 0, 'none') self.assertRaises(Exception, driver.jammer_off) driver.close() diff --git a/tests/test_killerbee_core.py b/tests/test_killerbee_core.py index 322f396..27c2efd 100644 --- a/tests/test_killerbee_core.py +++ b/tests/test_killerbee_core.py @@ -55,9 +55,9 @@ def test_killerbee_check_capability(self): self.assertTrue(kb.check_capability(KBCapabilities.INJECT)) self.assertTrue(kb.check_capability(KBCapabilities.PHYJAM_REFLEX)) self.assertTrue(kb.check_capability(KBCapabilities.SET_SYNC)) + self.assertTrue(kb.check_capability(KBCapabilities.PHYJAM)) self.assertFalse(kb.check_capability(KBCapabilities.NONE)) self.assertFalse(kb.check_capability(KBCapabilities.SELFACK)) - self.assertFalse(kb.check_capability(KBCapabilities.PHYJAM)) self.assertFalse(kb.check_capability(KBCapabilities.FREQ_900)) self.assertFalse(kb.check_capability(KBCapabilities.FREQ_863)) self.assertFalse(kb.check_capability(KBCapabilities.FREQ_868)) @@ -115,8 +115,8 @@ def test_killerbee_get_capabilities(self): self.assertTrue(capabilities[KBCapabilities.PHYJAM_REFLEX]) self.assertTrue(capabilities[KBCapabilities.SET_SYNC]) self.assertTrue(capabilities[KBCapabilities.FREQ_2400]) + self.assertTrue(capabilities[KBCapabilities.PHYJAM]) self.assertFalse(capabilities[KBCapabilities.NONE]) - self.assertFalse(capabilities[KBCapabilities.PHYJAM]) self.assertFalse(capabilities[KBCapabilities.SELFACK]) self.assertFalse(capabilities[KBCapabilities.FREQ_900]) self.assertFalse(capabilities[KBCapabilities.FREQ_863]) @@ -221,6 +221,7 @@ def test_killerbee_inject(self): def test_killerbee_pnext(self): kb = KillerBee(os.environ['APIMOTE_DEVSTRING']) + kb.set_channel(25) kb.pnext() self.assertTrue(True) @@ -230,8 +231,10 @@ def test_killerbee_pnext(self): def test_killerbee_jammer_on_off(self): kb = KillerBee(os.environ['APIMOTE_DEVSTRING']) - kb.jammer_on() - + #kb.jammer_on(None, 'reflexive') + #kb.jammer_on(None, 'constant') + #kb.jammer_on(11, 'constant') + self.assertRaises(Exception, kb.jammer_on, None, 0, 'none') self.assertRaises(Exception, kb.jammer_off) kb.close()