diff --git a/CHANGES.rst b/CHANGES.rst index b4e5c72b..d41615d3 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -9,6 +9,7 @@ in progress - QA: Improve tests on HTTP API for data acquisition - CI: Add testing against Python 3.9 - CI: Run tests against different versions of Mosquitto, InfluxDB and Grafana +- Add possibility to acquire bulk readings in JSON format .. _kotori-0.26.8: diff --git a/kotori/daq/graphing/grafana/dashboard.py b/kotori/daq/graphing/grafana/dashboard.py index 84a5c513..1b67dcd2 100644 --- a/kotori/daq/graphing/grafana/dashboard.py +++ b/kotori/daq/graphing/grafana/dashboard.py @@ -275,8 +275,19 @@ def collect_fields(data, prefixes=None, sorted=True): # time is from intercom.mqtt blacklist = ['_hex_', 'time'] + # Compute list of unique attribute names. + if isinstance(data, dict): + keys = data.keys() + elif isinstance(data, list): + keys = set() + for item in data: + for key in item.keys(): + keys.add(key) + else: + raise ValueError(f"Type of data {type(data)} not accepted") + fields = [] - for field in data.keys(): + for field in keys: if field in blacklist: continue diff --git a/kotori/daq/storage/influx.py b/kotori/daq/storage/influx.py index d48f9f6a..7a0ebcbb 100644 --- a/kotori/daq/storage/influx.py +++ b/kotori/daq/storage/influx.py @@ -64,6 +64,15 @@ def is_udp_database(self, name): return False def write(self, meta, data): + if isinstance(data, dict): + self.write_single(meta, data) + elif isinstance(data, list): + for item in data: + self.write_single(meta, item) + else: + raise ValueError(f"Type of data {type(data)} not accepted") + + def write_single(self, meta, data): meta_copy = deepcopy(dict(meta)) data_copy = deepcopy(data) diff --git a/test/settings/mqttkit.py b/test/settings/mqttkit.py index 4ae03756..f99a921d 100644 --- a/test/settings/mqttkit.py +++ b/test/settings/mqttkit.py @@ -12,6 +12,7 @@ class TestSettings: # InfluxDB settings. influx_database = 'mqttkit_1_itest' influx_measurement_sensors = 'foo_bar_sensors' + influx2_measurement_sensors = 'foo_bar2_sensors' influx_measurement_events = 'foo_bar_events' # Grafana settings. @@ -22,6 +23,7 @@ class TestSettings: # MQTT channel settings. mqtt_topic_single = 'mqttkit-1/itest/foo/bar/data' mqtt_topic_json = 'mqttkit-1/itest/foo/bar/data.json' + mqtt2_topic_json = 'mqttkit-1/itest/foo/bar2/data.json' mqtt_topic_event = 'mqttkit-1/itest/foo/bar/event.json' mqtt_topic_homie = 'mqttkit-1/itest/foo/bar/data/__json__' mqtt_topic_json_legacy = 'mqttkit-1/itest/foo/bar/message-json' diff --git a/test/test_daq_grafana.py b/test/test_daq_grafana.py index 48930082..66bde547 100644 --- a/test/test_daq_grafana.py +++ b/test/test_daq_grafana.py @@ -13,7 +13,7 @@ @pytest_twisted.inlineCallbacks @pytest.mark.grafana -def test_mqtt_to_grafana(machinery, create_influxdb, reset_influxdb, reset_grafana): +def test_mqtt_to_grafana_single(machinery, create_influxdb, reset_influxdb, reset_grafana): """ Publish single reading in JSON format to MQTT broker and proof that a corresponding datasource and a dashboard was created in Grafana. @@ -44,3 +44,55 @@ def test_mqtt_to_grafana(machinery, create_influxdb, reset_influxdb, reset_grafa target = dashboard['dashboard']['rows'][0]['panels'][0]['targets'][0] assert target['measurement'] == settings.influx_measurement_sensors assert 'temperature' in target['query'] or 'humidity' in target['query'] + + +@pytest_twisted.inlineCallbacks +@pytest.mark.grafana +def test_mqtt_to_grafana_bulk(machinery, create_influxdb, reset_influxdb, reset_grafana): + """ + Publish multiple readings in JSON format to MQTT broker and proof + that a corresponding datasource and a dashboard was created in Grafana. + """ + + # Submit multiple measurements, without timestamp. + data = [ + { + 'temperature': 21.42, + 'humidity': 41.55, + }, + { + 'temperature': 42.84, + 'humidity': 83.1, + 'voltage': 4.2, + }, + { + 'weight': 10.10, + }, + ] + yield mqtt_json_sensor(settings.mqtt2_topic_json, data) + + # Wait for some time to process the message. + yield sleep(PROCESS_DELAY_MQTT) + yield sleep(PROCESS_DELAY_MQTT) + yield sleep(PROCESS_DELAY_MQTT) + + # Proof that Grafana is well provisioned. + logger.info('Grafana: Checking datasource') + datasource_names = [] + for datasource in grafana.client.datasources.get(): + datasource_names.append(datasource['name']) + assert settings.influx_database in datasource_names + + logger.info('Grafana: Checking dashboard') + dashboard_name = settings.grafana_dashboards[0] + dashboard = grafana.client.dashboards.db[dashboard_name].get() + targets = dashboard['dashboard']['rows'][0]['panels'][0]['targets'] + + # Validate table name. + assert targets[0]['measurement'] == settings.influx2_measurement_sensors + + # Validate field names. + fields = set() + for target in targets: + fields.add(target["fields"][0]["name"]) + assert fields == set(["temperature", "humidity", "weight", "voltage"]) diff --git a/test/test_daq_mqtt.py b/test/test_daq_mqtt.py index a92b970d..3ca23683 100644 --- a/test/test_daq_mqtt.py +++ b/test/test_daq_mqtt.py @@ -37,6 +37,40 @@ def test_mqtt_to_influxdb_json_single(machinery, create_influxdb, reset_influxdb yield record +@pytest_twisted.inlineCallbacks +@pytest.mark.mqtt +def test_mqtt_to_influxdb_json_bulk(machinery, create_influxdb, reset_influxdb): + """ + Publish multiple readings in JSON format to MQTT broker + and proof it is stored in the InfluxDB database. + """ + + # Submit multiple measurements, without timestamp. + data = [ + { + 'temperature': 21.42, + 'humidity': 41.55, + }, + { + 'temperature': 42.84, + 'humidity': 83.1, + }, + ] + yield threads.deferToThread(mqtt_json_sensor, settings.mqtt_topic_json, data) + + # Wait for some time to process the message. + yield sleep(PROCESS_DELAY_MQTT) + + # Proof that data arrived in InfluxDB. + record = influx_sensors.get_record(index=0) + del record['time'] + assert record == {u'temperature': 21.42, u'humidity': 41.55} + + record = influx_sensors.get_record(index=1) + del record['time'] + assert record == {u'temperature': 42.84, u'humidity': 83.1} + + @pytest_twisted.inlineCallbacks @pytest.mark.mqtt @pytest.mark.legacy