Skip to content

Commit 4e06a94

Browse files
authored
Pipe: serialize sink transfers by region (#17946) (#17970)
(cherry picked from commit 5e6f1c2)
1 parent fc581dc commit 4e06a94

5 files changed

Lines changed: 58 additions & 26 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkParallelIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public void testIoTConnectorParallel() throws Exception {
6060
connectorAttributes.put("connector.batch.enable", "false");
6161
connectorAttributes.put("connector.ip", receiverIp);
6262
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
63+
connectorAttributes.put("connector.serialize-by-region", "false");
6364
connectorAttributes.put("connector.parallel.tasks", "3");
6465

6566
final TSStatus status =

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
2121

2222
import org.apache.iotdb.commons.consensus.DataRegionId;
23-
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
2423
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
2524
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
2625
import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
@@ -65,11 +64,10 @@ public synchronized String register(
6564
final Supplier<? extends PipeSinkSubtaskExecutor> executorSupplier,
6665
final PipeParameters pipeSinkParameters,
6766
final PipeTaskSinkRuntimeEnvironment environment) {
67+
final String connectorName =
68+
PipeSinkConstant.getConnectorOrSinkNameWithDefault(pipeSinkParameters);
6869
final String connectorKey =
69-
pipeSinkParameters
70-
.getStringOrDefault(
71-
Arrays.asList(PipeSinkConstant.CONNECTOR_KEY, PipeSinkConstant.SINK_KEY),
72-
BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
70+
connectorName
7371
// Convert the value of `CONNECTOR_KEY` or `SINK_KEY` to lowercase
7472
// for matching in `CONNECTOR_CONSTRUCTORS`
7573
.toLowerCase();
@@ -87,29 +85,30 @@ public synchronized String register(
8785

8886
final int sinkNum;
8987
boolean realTimeFirst = false;
88+
boolean serializeByRegion = false;
9089
String attributeSortedString = generateAttributeSortedString(pipeSinkParameters);
9190
if (isDataSinkConnector) {
91+
serializeByRegion = PipeSinkConstant.isSerializeByRegionEnabled(pipeSinkParameters);
9292
sinkNum =
93-
pipeSinkParameters.getIntOrDefault(
94-
Arrays.asList(
95-
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
96-
PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
97-
PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(
98-
pipeSinkParameters
99-
.getStringOrDefault(
100-
Arrays.asList(
101-
PipeSinkConstant.CONNECTOR_KEY, PipeSinkConstant.SINK_KEY),
102-
BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName())
103-
.toLowerCase())
104-
? 1
105-
: PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
93+
serializeByRegion
94+
? 1
95+
: pipeSinkParameters.getIntOrDefault(
96+
Arrays.asList(
97+
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
98+
PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
99+
PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(connectorKey)
100+
? 1
101+
: PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
106102
realTimeFirst =
107103
pipeSinkParameters.getBooleanOrDefault(
108104
Arrays.asList(
109105
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
110106
PipeSinkConstant.SINK_REALTIME_FIRST_KEY),
111107
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE);
112-
attributeSortedString = "data_" + attributeSortedString;
108+
attributeSortedString =
109+
serializeByRegion
110+
? "data_region_" + environment.getRegionId() + "_" + attributeSortedString
111+
: "data_" + attributeSortedString;
113112
} else {
114113
// Do not allow parallel tasks for schema region connectors
115114
// to avoid the potential disorder of the schema region data transfer

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
2828
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
2929
import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
30+
import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
3031
import org.apache.iotdb.db.pipe.source.dataregion.IoTDBDataRegionSource;
3132
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
3233

@@ -132,5 +133,20 @@ public void testPipePluginAgent() {
132133
}
133134
}))
134135
.getClass());
136+
Assert.assertEquals(
137+
IoTDBDataRegionSyncSink.class,
138+
agent.dataRegion().reflectSink(new PipeParameters(new HashMap<>())).getClass());
139+
Assert.assertEquals(
140+
IoTDBDataRegionAsyncSink.class,
141+
agent
142+
.dataRegion()
143+
.reflectSink(
144+
new PipeParameters(
145+
new HashMap<String, String>() {
146+
{
147+
put(PipeSinkConstant.CONNECTOR_SERIALIZE_BY_REGION_KEY, "false");
148+
}
149+
}))
150+
.getClass());
135151
}
136152
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipeSinkConstructor.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,11 @@
1919

2020
package org.apache.iotdb.commons.pipe.agent.plugin.constructor;
2121

22-
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
2322
import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMetaKeeper;
2423
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
2524
import org.apache.iotdb.pipe.api.PipeConnector;
2625
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
2726

28-
import java.util.Arrays;
29-
3027
public abstract class PipeSinkConstructor extends PipePluginConstructor {
3128

3229
protected PipeSinkConstructor(PipePluginMetaKeeper pipePluginMetaKeeper) {
@@ -41,10 +38,7 @@ protected PipeSinkConstructor() {
4138
public final PipeConnector reflectPlugin(PipeParameters connectorParameters) {
4239
return (PipeConnector)
4340
reflectPluginByKey(
44-
connectorParameters
45-
.getStringOrDefault(
46-
Arrays.asList(PipeSinkConstant.CONNECTOR_KEY, PipeSinkConstant.SINK_KEY),
47-
BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
41+
PipeSinkConstant.getConnectorOrSinkNameWithDefault(connectorParameters)
4842
// Convert the value of `CONNECTOR_KEY` or `SINK_KEY` to lowercase for matching in
4943
// `PLUGIN_CONSTRUCTORS`
5044
.toLowerCase());

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.iotdb.commons.conf.CommonDescriptor;
2323
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
2424
import org.apache.iotdb.commons.pipe.config.PipeConfig;
25+
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
2526

2627
import com.github.luben.zstd.Zstd;
2728

@@ -68,6 +69,27 @@ public class PipeSinkConstant {
6869
public static final String SINK_REALTIME_FIRST_KEY = "sink.realtime-first";
6970
public static final boolean CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE = true;
7071

72+
public static final String CONNECTOR_SERIALIZE_BY_REGION_KEY = "connector.serialize-by-region";
73+
public static final String SINK_SERIALIZE_BY_REGION_KEY = "sink.serialize-by-region";
74+
public static final boolean CONNECTOR_SERIALIZE_BY_REGION_DEFAULT_VALUE = true;
75+
76+
public static boolean isSerializeByRegionEnabled(final PipeParameters parameters) {
77+
return parameters.getBooleanOrDefault(
78+
Arrays.asList(CONNECTOR_SERIALIZE_BY_REGION_KEY, SINK_SERIALIZE_BY_REGION_KEY),
79+
CONNECTOR_SERIALIZE_BY_REGION_DEFAULT_VALUE);
80+
}
81+
82+
public static String getConnectorOrSinkNameWithDefault(final PipeParameters parameters) {
83+
return parameters.getStringOrDefault(
84+
Arrays.asList(CONNECTOR_KEY, SINK_KEY), getDefaultConnectorOrSinkName(parameters));
85+
}
86+
87+
private static String getDefaultConnectorOrSinkName(final PipeParameters parameters) {
88+
return isSerializeByRegionEnabled(parameters)
89+
? BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName()
90+
: BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName();
91+
}
92+
7193
public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY = "connector.batch.enable";
7294
public static final String SINK_IOTDB_BATCH_MODE_ENABLE_KEY = "sink.batch.enable";
7395
public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE = true;

0 commit comments

Comments
 (0)