Skip to content

Commit ed6357c

Browse files
author
shiwenyan
committed
Add mqtt IT for json mode.
1 parent d90397d commit ed6357c

2 files changed

Lines changed: 346 additions & 0 deletions

File tree

Lines changed: 345 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,345 @@
1+
package org.apache.iotdb.db.it.mqtt;
2+
3+
import org.apache.iotdb.isession.ISession;
4+
import org.apache.iotdb.isession.SessionDataSet;
5+
import org.apache.iotdb.it.env.EnvFactory;
6+
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
7+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
8+
import org.apache.iotdb.itbase.category.ClusterIT;
9+
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
10+
import org.apache.iotdb.itbase.env.BaseEnv;
11+
import org.apache.iotdb.rpc.StatementExecutionException;
12+
13+
import org.apache.tsfile.read.common.Field;
14+
import org.apache.tsfile.read.common.RowRecord;
15+
import org.awaitility.Awaitility;
16+
import org.fusesource.mqtt.client.BlockingConnection;
17+
import org.fusesource.mqtt.client.MQTT;
18+
import org.fusesource.mqtt.client.QoS;
19+
import org.junit.After;
20+
import org.junit.Before;
21+
import org.junit.Test;
22+
import org.junit.experimental.categories.Category;
23+
import org.junit.runner.RunWith;
24+
25+
import java.io.IOException;
26+
import java.util.List;
27+
import java.util.concurrent.TimeUnit;
28+
29+
import static org.junit.Assert.assertEquals;
30+
import static org.junit.Assert.assertTrue;
31+
import static org.junit.Assert.fail;
32+
33+
/**
34+
* Integration tests for MQTT service with JSON payload formatter. JSON formatter supports tree
35+
* model data insertion.
36+
*/
37+
@RunWith(IoTDBTestRunner.class)
38+
@Category({LocalStandaloneIT.class, ClusterIT.class})
39+
public class IoTDBMQTTServiceJsonIT {
40+
41+
private BlockingConnection connection;
42+
private static final String IP = System.getProperty("RemoteIp", "127.0.0.1");
43+
private static final String USER = System.getProperty("RemoteUser", "root");
44+
private static final String PASSWORD = System.getProperty("RemotePassword", "root");
45+
public static final String FORMATTER = "json";
46+
47+
@Before
48+
public void setUp() throws Exception {
49+
BaseEnv baseEnv = EnvFactory.getEnv();
50+
baseEnv.getConfig().getDataNodeConfig().setEnableMQTTService(true);
51+
baseEnv.getConfig().getDataNodeConfig().setMqttPayloadFormatter(FORMATTER);
52+
baseEnv.initClusterEnvironment();
53+
DataNodeWrapper portConflictDataNodeWrapper = EnvFactory.getEnv().getDataNodeWrapper(0);
54+
int port = portConflictDataNodeWrapper.getMqttPort();
55+
MQTT mqtt = new MQTT();
56+
mqtt.setHost(IP, port);
57+
mqtt.setUserName(USER);
58+
mqtt.setPassword(PASSWORD);
59+
mqtt.setConnectAttemptsMax(3);
60+
mqtt.setReconnectDelay(10);
61+
mqtt.setClientId("jsonClientId1");
62+
63+
connection = mqtt.blockingConnection();
64+
connection.connect();
65+
}
66+
67+
@After
68+
public void tearDown() throws Exception {
69+
try {
70+
if (connection != null) {
71+
connection.disconnect();
72+
}
73+
} catch (IOException e) {
74+
e.printStackTrace();
75+
fail(e.getMessage());
76+
}
77+
EnvFactory.getEnv().cleanClusterEnvironment();
78+
}
79+
80+
/** Test single JSON message with multiple measurements */
81+
@Test
82+
public void testSingleJsonMessage() throws Exception {
83+
try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
84+
String payload =
85+
"{"
86+
+ "\"device\":\"root.sg.d1\","
87+
+ "\"timestamp\":1,"
88+
+ "\"measurements\":[\"s1\",\"s2\"],"
89+
+ "\"values\":[1.5,2.5]"
90+
+ "}";
91+
92+
Awaitility.await()
93+
.atMost(3, TimeUnit.MINUTES)
94+
.pollInterval(1, TimeUnit.SECONDS)
95+
.until(
96+
() -> {
97+
connection.publish("root.sg.d1", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
98+
try (final SessionDataSet dataSet =
99+
session.executeQueryStatement("select s1, s2 from root.sg.d1 where time = 1")) {
100+
if (!dataSet.hasNext()) {
101+
return false;
102+
}
103+
RowRecord row = dataSet.next();
104+
List<Field> fields = row.getFields();
105+
assertEquals(2, fields.size());
106+
assertEquals(1.5, fields.get(0).getDoubleV(), 0.001);
107+
assertEquals(2.5, fields.get(1).getDoubleV(), 0.001);
108+
return true;
109+
} catch (StatementExecutionException e) {
110+
if (e.getMessage() != null && e.getMessage().contains("does not exist")) {
111+
return false;
112+
} else {
113+
throw e;
114+
}
115+
}
116+
});
117+
}
118+
}
119+
120+
/** Test batch JSON message with timestamps array */
121+
@Test
122+
public void testBatchJsonMessage() throws Exception {
123+
try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
124+
String payload =
125+
"{"
126+
+ "\"device\":\"root.sg.d2\","
127+
+ "\"timestamps\":[1,2,3],"
128+
+ "\"measurements\":[\"s1\",\"s2\"],"
129+
+ "\"values\":[[1.0,2.0],[3.0,4.0],[5.0,6.0]]"
130+
+ "}";
131+
132+
Awaitility.await()
133+
.atMost(3, TimeUnit.MINUTES)
134+
.pollInterval(1, TimeUnit.SECONDS)
135+
.until(
136+
() -> {
137+
connection.publish("root.sg.d2", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
138+
try (final SessionDataSet dataSet =
139+
session.executeQueryStatement("select count(s1) from root.sg.d2")) {
140+
if (!dataSet.hasNext()) {
141+
return false;
142+
}
143+
RowRecord row = dataSet.next();
144+
// Should have 3 records
145+
assertEquals(3, row.getFields().get(0).getLongV());
146+
return true;
147+
} catch (StatementExecutionException e) {
148+
if (e.getMessage() != null && e.getMessage().contains("does not exist")) {
149+
return false;
150+
} else {
151+
throw e;
152+
}
153+
}
154+
});
155+
}
156+
}
157+
158+
/** Test JSON array with multiple messages */
159+
@Test
160+
public void testJsonArray() throws Exception {
161+
try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
162+
String payload =
163+
"["
164+
+ "{\"device\":\"root.sg.d3\",\"timestamp\":1,\"measurements\":[\"s1\"],\"values\":[10.0]},"
165+
+ "{\"device\":\"root.sg.d3\",\"timestamp\":2,\"measurements\":[\"s1\"],\"values\":[20.0]},"
166+
+ "{\"device\":\"root.sg.d3\",\"timestamp\":3,\"measurements\":[\"s1\"],\"values\":[30.0]}"
167+
+ "]";
168+
169+
Awaitility.await()
170+
.atMost(3, TimeUnit.MINUTES)
171+
.pollInterval(1, TimeUnit.SECONDS)
172+
.until(
173+
() -> {
174+
connection.publish("root.sg.d3", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
175+
try (final SessionDataSet dataSet =
176+
session.executeQueryStatement("select s1 from root.sg.d3")) {
177+
int count = 0;
178+
double sum = 0;
179+
while (dataSet.hasNext()) {
180+
RowRecord row = dataSet.next();
181+
sum += row.getFields().get(0).getDoubleV();
182+
count++;
183+
}
184+
if (count != 3) {
185+
return false;
186+
}
187+
// sum should be 10 + 20 + 30 = 60
188+
assertEquals(60.0, sum, 0.001);
189+
return true;
190+
} catch (StatementExecutionException e) {
191+
if (e.getMessage() != null && e.getMessage().contains("does not exist")) {
192+
return false;
193+
} else {
194+
throw e;
195+
}
196+
}
197+
});
198+
}
199+
}
200+
201+
/** Test JSON with explicit data types */
202+
@Test
203+
public void testJsonWithDataTypes() throws Exception {
204+
try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
205+
String payload =
206+
"{"
207+
+ "\"device\":\"root.sg.d4\","
208+
+ "\"timestamp\":1,"
209+
+ "\"measurements\":[\"intVal\",\"floatVal\",\"boolVal\",\"textVal\"],"
210+
+ "\"values\":[100,3.14,true,\"hello\"],"
211+
+ "\"datatypes\":[\"INT32\",\"FLOAT\",\"BOOLEAN\",\"TEXT\"]"
212+
+ "}";
213+
214+
Awaitility.await()
215+
.atMost(3, TimeUnit.MINUTES)
216+
.pollInterval(1, TimeUnit.SECONDS)
217+
.until(
218+
() -> {
219+
connection.publish("root.sg.d4", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
220+
try (final SessionDataSet dataSet =
221+
session.executeQueryStatement(
222+
"select intVal, floatVal, boolVal, textVal from root.sg.d4 where time = 1")) {
223+
if (!dataSet.hasNext()) {
224+
return false;
225+
}
226+
List<Field> fields = dataSet.next().getFields();
227+
assertEquals(4, fields.size());
228+
assertEquals(100, fields.get(0).getIntV());
229+
assertEquals(3.14f, fields.get(1).getFloatV(), 0.01);
230+
assertTrue(fields.get(2).getBoolV());
231+
assertEquals("hello", fields.get(3).getStringValue());
232+
return true;
233+
} catch (StatementExecutionException e) {
234+
if (e.getMessage() != null && e.getMessage().contains("does not exist")) {
235+
return false;
236+
} else {
237+
throw e;
238+
}
239+
}
240+
});
241+
}
242+
}
243+
244+
/** Test multiple devices in single JSON array */
245+
@Test
246+
public void testMultipleDevicesJsonArray() throws Exception {
247+
try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
248+
String payload =
249+
"["
250+
+ "{\"device\":\"root.sg.device1\",\"timestamp\":1,\"measurements\":[\"temp\"],\"values\":[25.5]},"
251+
+ "{\"device\":\"root.sg.device2\",\"timestamp\":1,\"measurements\":[\"temp\"],\"values\":[26.5]},"
252+
+ "{\"device\":\"root.sg.device3\",\"timestamp\":1,\"measurements\":[\"temp\"],\"values\":[27.5]}"
253+
+ "]";
254+
255+
Awaitility.await()
256+
.atMost(3, TimeUnit.MINUTES)
257+
.pollInterval(1, TimeUnit.SECONDS)
258+
.until(
259+
() -> {
260+
connection.publish("root.sg", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
261+
try {
262+
// Check device1
263+
try (final SessionDataSet dataSet1 =
264+
session.executeQueryStatement(
265+
"select temp from root.sg.device1 where time = 1")) {
266+
if (!dataSet1.hasNext()) {
267+
return false;
268+
}
269+
assertEquals(25.5, dataSet1.next().getFields().get(0).getDoubleV(), 0.001);
270+
}
271+
// Check device2
272+
try (final SessionDataSet dataSet2 =
273+
session.executeQueryStatement(
274+
"select temp from root.sg.device2 where time = 1")) {
275+
if (!dataSet2.hasNext()) {
276+
return false;
277+
}
278+
assertEquals(26.5, dataSet2.next().getFields().get(0).getDoubleV(), 0.001);
279+
}
280+
// Check device3
281+
try (final SessionDataSet dataSet3 =
282+
session.executeQueryStatement(
283+
"select temp from root.sg.device3 where time = 1")) {
284+
if (!dataSet3.hasNext()) {
285+
return false;
286+
}
287+
assertEquals(27.5, dataSet3.next().getFields().get(0).getDoubleV(), 0.001);
288+
}
289+
return true;
290+
} catch (StatementExecutionException e) {
291+
if (e.getMessage() != null && e.getMessage().contains("does not exist")) {
292+
return false;
293+
} else {
294+
throw e;
295+
}
296+
}
297+
});
298+
}
299+
}
300+
301+
/** Test batch JSON with different values per timestamp */
302+
@Test
303+
public void testBatchJsonWithVariousValues() throws Exception {
304+
try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
305+
String payload =
306+
"{"
307+
+ "\"device\":\"root.sg.d5\","
308+
+ "\"timestamps\":[100,200,300,400,500],"
309+
+ "\"measurements\":[\"temperature\",\"humidity\"],"
310+
+ "\"values\":[[20.1,60.0],[21.2,61.5],[22.3,62.0],[23.4,63.5],[24.5,64.0]]"
311+
+ "}";
312+
313+
Awaitility.await()
314+
.atMost(3, TimeUnit.MINUTES)
315+
.pollInterval(1, TimeUnit.SECONDS)
316+
.until(
317+
() -> {
318+
connection.publish("root.sg.d5", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
319+
try (final SessionDataSet dataSet =
320+
session.executeQueryStatement("select temperature, humidity from root.sg.d5")) {
321+
int count = 0;
322+
while (dataSet.hasNext()) {
323+
RowRecord row = dataSet.next();
324+
List<Field> fields = row.getFields();
325+
assertEquals(2, fields.size());
326+
// Temperature should be between 20 and 25
327+
double temp = fields.get(0).getDoubleV();
328+
assertTrue(temp >= 20.0 && temp <= 25.0);
329+
// Humidity should be between 60 and 65
330+
double humidity = fields.get(1).getDoubleV();
331+
assertTrue(humidity >= 60.0 && humidity <= 65.0);
332+
count++;
333+
}
334+
return count == 5;
335+
} catch (StatementExecutionException e) {
336+
if (e.getMessage() != null && e.getMessage().contains("does not exist")) {
337+
return false;
338+
} else {
339+
throw e;
340+
}
341+
}
342+
});
343+
}
344+
}
345+
}

integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public void setUp() throws Exception {
6969
mqtt.setPassword(PASSWORD);
7070
mqtt.setConnectAttemptsMax(3);
7171
mqtt.setReconnectDelay(10);
72+
mqtt.setClientId("clientId01");
7273

7374
connection = mqtt.blockingConnection();
7475
connection.connect();

0 commit comments

Comments
 (0)