Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ in progress
- **InfluxDB:** Added capability to acquire bulk readings in JSON format
- **MQTT:** Added capability to acquire bulk readings in compact JSON format,
with timestamps as keys
- **InfluxDB:** Added decoder for acquiring data in NDJSON format

.. _kotori-0.28.0:

Expand Down
7 changes: 7 additions & 0 deletions kotori/daq/decoder/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# (c) 2019-2021 Andreas Motl <andreas@getkotori.org>
from kotori.daq.decoder.airrohr import AirrohrDecoder
from kotori.daq.decoder.json import CompactTimestampedJsonDecoder
from kotori.daq.decoder.ndjson import NdJsonDecoder
from kotori.daq.decoder.tasmota import TasmotaSensorDecoder, TasmotaStateDecoder
from kotori.daq.decoder.schema import MessageType
from kotori.daq.decoder.tts_ttn import TheThingsStackDecoder
Expand All @@ -25,6 +26,12 @@ def probe(self, payload: str = None):
if 'slot' not in self.topology:
return False

# NDJSON format
if self.topology.slot.endswith('data.ndjson'):
self.info.message_type = MessageType.DATA_CONTAINER
self.info.decoder = NdJsonDecoder
return True

# Compact JSON format, with timestamps as keys
if self.topology.slot.endswith('tc.json'):
self.info.message_type = MessageType.DATA_CONTAINER
Expand Down
41 changes: 41 additions & 0 deletions kotori/daq/decoder/ndjson.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
# (c) 2023 Andreas Motl <andreas@getkotori.org>


class NdJsonDecoder:
"""
Decode NDJSON payloads. NDJSON is a newline-delimited JSON format.
It is suitable for submitting multiple JSON records in bulk, or for
streaming them.

NDJSON has been called LDJSON, and is also known as JSON Lines, see
also JSON streaming.

- http://ndjson.org/
- https://jsonlines.org/
- https://en.wikipedia.org/wiki/JSON_streaming

Documentation
=============
- https://getkotori.org/docs/handbook/decoders/ndjson.html (not yet)

Example
=======
::

{"temperature":21.42,"humidity":41.55}
{"temperature":42.84,"humidity":83.1}

"""

@staticmethod
def decode(payload):

# Decode from NDJSON, using pandas.
import io
import pandas as pd
df = pd.read_json(io.StringIO(payload), lines=True)

# Transform to records again.
data = df.to_dict(orient="records")
return data
Comment on lines +31 to +41

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The decoder could use orjson and orjsonl from the start, skipping pandas completely?

1 change: 1 addition & 0 deletions test/settings/mqttkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class TestSettings:
mqtt_topic_event = 'mqttkit-1/itest/foo/bar/event.json'
mqtt_topic_homie = 'mqttkit-1/itest/foo/bar/data/__json__'
mqtt_topic_json_compact = 'mqttkit-1/itest/foo/bar/tc.json'
mqtt_topic_ndjson = 'mqttkit-1/itest/foo/bar/data.ndjson'
mqtt_topic_json_legacy = 'mqttkit-1/itest/foo/bar/message-json'

# HTTP channel settings.
Expand Down
38 changes: 37 additions & 1 deletion test/test_daq_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from twisted.internet import threads

from test.settings.mqttkit import settings, influx_sensors, PROCESS_DELAY_MQTT, device_influx_sensors
from test.util import mqtt_json_sensor, sleep, mqtt_sensor
from test.util import mqtt_json_sensor, sleep, mqtt_sensor, mqtt_ndjson_sensor

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -110,6 +110,42 @@ def test_mqtt_to_influxdb_json_compact_bulk(machinery, create_influxdb, reset_in
assert record == {u'time': '2021-01-19T18:56:08Z', u'temperature': 42.84, u'humidity': 83.1}


@pytest_twisted.inlineCallbacks
@pytest.mark.mqtt
def test_mqtt_to_influxdb_ndjson_bulk(machinery, create_influxdb, reset_influxdb):
"""
Publish multiple readings in NDJSON format to MQTT broker
and proof they are stored in the InfluxDB database.

TODO: Grafana provisioning failed!
"""

# Submit multiple measurements, without timestamp.
data = [
{
'temperature': 21.42,
'humidity': 41.55,
},
{
'temperature': 42.84,
'humidity': 83.1,
},
]
yield threads.deferToThread(mqtt_ndjson_sensor, settings.mqtt_topic_ndjson, data)

# Wait for some time to process the message.
yield sleep(PROCESS_DELAY_MQTT)

# Proof that data arrived in InfluxDB.
record = influx_sensors.get_record(index=0)
del record['time']
assert record == {u'temperature': 21.42, u'humidity': 41.55}

record = influx_sensors.get_record(index=1)
del record['time']
assert record == {u'temperature': 42.84, u'humidity': 83.1}


@pytest_twisted.inlineCallbacks
@pytest.mark.mqtt
@pytest.mark.legacy
Expand Down
7 changes: 7 additions & 0 deletions test/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import sys
import typing as t

import pandas as pd
import pytest
import requests
from influxdb import InfluxDBClient
Expand Down Expand Up @@ -205,6 +206,12 @@ def mqtt_json_sensor(topic, data):
return mqtt_sensor(topic, payload)


def mqtt_ndjson_sensor(topic, data):
df = pd.DataFrame.from_records(data)
payload = df.to_json(orient="records", lines=True)
return mqtt_sensor(topic, payload)


def mqtt_sensor(topic, payload):

logger.info('MQTT: Submitting reading')
Expand Down