Skip to content

Commit dc0e1a5

Browse files
committed
Pipe: serialize sink transfers by region
1 parent c39061a commit dc0e1a5

6 files changed

Lines changed: 64 additions & 27 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkParallelIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ public void testIoTConnectorParallel() throws Exception {
8080
connectorAttributes.put("connector.batch.enable", "true");
8181
connectorAttributes.put("connector.ip", receiverIp);
8282
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
83+
connectorAttributes.put("connector.serialize-by-region", "false");
8384
connectorAttributes.put("connector.parallel.tasks", "3");
8485

8586
final TSStatus status =

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSinkParallelIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public void testIoTConnectorParallel() throws Exception {
7171
sinkAttributes.put("sink.batch.enable", "false");
7272
sinkAttributes.put("sink.ip", receiverIp);
7373
sinkAttributes.put("sink.port", Integer.toString(receiverPort));
74+
sinkAttributes.put("sink.serialize-by-region", "false");
7475
sinkAttributes.put("sink.parallel.tasks", "3");
7576

7677
final TSStatus status =

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
2121

2222
import org.apache.iotdb.commons.consensus.DataRegionId;
23-
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
2423
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
2524
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
2625
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
@@ -70,11 +69,10 @@ public synchronized String register(
7069
final Supplier<? extends PipeSinkSubtaskExecutor> executorSupplier,
7170
final PipeParameters pipeSinkParameters,
7271
final PipeTaskSinkRuntimeEnvironment environment) {
72+
final String connectorName =
73+
PipeSinkConstant.getConnectorOrSinkNameWithDefault(pipeSinkParameters);
7374
final String connectorKey =
74-
pipeSinkParameters
75-
.getStringOrDefault(
76-
Arrays.asList(PipeSinkConstant.CONNECTOR_KEY, PipeSinkConstant.SINK_KEY),
77-
BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
75+
connectorName
7876
// Convert the value of `CONNECTOR_KEY` or `SINK_KEY` to lowercase
7977
// for matching in `CONNECTOR_CONSTRUCTORS`
8078
.toLowerCase();
@@ -93,38 +91,43 @@ public synchronized String register(
9391

9492
final int sinkNum;
9593
boolean realTimeFirst = false;
94+
boolean serializeByRegion = false;
9695
String attributeSortedString = generateAttributeSortedString(pipeSinkParameters);
9796
final String attributeDisplayString = generateAttributeDisplayString(pipeSinkParameters);
9897
if (isDataRegionSink) {
98+
serializeByRegion = PipeSinkConstant.isSerializeByRegionEnabled(pipeSinkParameters);
9999
sinkNum =
100-
pipeSinkParameters.getIntOrDefault(
101-
Arrays.asList(
102-
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
103-
PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
104-
PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(
105-
pipeSinkParameters
106-
.getStringOrDefault(
107-
Arrays.asList(
108-
PipeSinkConstant.CONNECTOR_KEY, PipeSinkConstant.SINK_KEY),
109-
BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName())
110-
.toLowerCase())
111-
? 1
112-
: PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
100+
serializeByRegion
101+
? 1
102+
: pipeSinkParameters.getIntOrDefault(
103+
Arrays.asList(
104+
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
105+
PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
106+
PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(connectorKey)
107+
? 1
108+
: PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
113109
realTimeFirst =
114110
pipeSinkParameters.getBooleanOrDefault(
115111
Arrays.asList(
116112
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
117113
PipeSinkConstant.SINK_REALTIME_FIRST_KEY),
118114
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE);
119-
attributeSortedString = "data_" + attributeSortedString;
115+
attributeSortedString =
116+
serializeByRegion
117+
? "data_region_" + environment.getRegionId() + "_" + attributeSortedString
118+
: "data_" + attributeSortedString;
120119
} else {
121120
// Do not allow parallel tasks for schema region connectors
122121
// to avoid the potential disorder of the schema region data transfer
123122
sinkNum = 1;
124123
attributeSortedString = "schema_" + attributeSortedString;
125124
}
126125
final String attributeDisplayStringWithPrefix =
127-
isDataRegionSink ? "data_" + attributeDisplayString : "schema_" + attributeDisplayString;
126+
isDataRegionSink
127+
? serializeByRegion
128+
? "data_region_" + environment.getRegionId() + "_" + attributeDisplayString
129+
: "data_" + attributeDisplayString
130+
: "schema_" + attributeDisplayString;
128131
environment.setAttributeSortedString(attributeDisplayStringWithPrefix);
129132

130133
if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.iotdb.db.pipe.processor.iotconsensusv2.IoTConsensusV2Processor;
3030
import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.IoTConsensusV2AsyncSink;
3131
import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
32+
import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
3233
import org.apache.iotdb.db.pipe.source.dataregion.IoTDBDataRegionSource;
3334
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
3435

@@ -148,6 +149,21 @@ public void testPipePluginAgent() {
148149
}
149150
}))
150151
.getClass());
152+
Assert.assertEquals(
153+
IoTDBDataRegionSyncSink.class,
154+
agent.dataRegion().reflectSink(new PipeParameters(new HashMap<>())).getClass());
155+
Assert.assertEquals(
156+
IoTDBDataRegionAsyncSink.class,
157+
agent
158+
.dataRegion()
159+
.reflectSink(
160+
new PipeParameters(
161+
new HashMap<String, String>() {
162+
{
163+
put(PipeSinkConstant.CONNECTOR_SERIALIZE_BY_REGION_KEY, "false");
164+
}
165+
}))
166+
.getClass());
151167
Assert.assertEquals(
152168
IoTConsensusV2AsyncSink.class,
153169
agent

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipeSinkConstructor.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,11 @@
1919

2020
package org.apache.iotdb.commons.pipe.agent.plugin.constructor;
2121

22-
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
2322
import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMetaKeeper;
2423
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
2524
import org.apache.iotdb.pipe.api.PipeConnector;
2625
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
2726

28-
import java.util.Arrays;
29-
3027
public abstract class PipeSinkConstructor extends PipePluginConstructor {
3128

3229
protected PipeSinkConstructor(PipePluginMetaKeeper pipePluginMetaKeeper) {
@@ -41,10 +38,7 @@ protected PipeSinkConstructor() {
4138
public final PipeConnector reflectPlugin(PipeParameters connectorParameters) {
4239
return (PipeConnector)
4340
reflectPluginByKey(
44-
connectorParameters
45-
.getStringOrDefault(
46-
Arrays.asList(PipeSinkConstant.CONNECTOR_KEY, PipeSinkConstant.SINK_KEY),
47-
BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
41+
PipeSinkConstant.getConnectorOrSinkNameWithDefault(connectorParameters)
4842
// Convert the value of `CONNECTOR_KEY` or `SINK_KEY` to lowercase for matching in
4943
// `PLUGIN_CONSTRUCTORS`
5044
.toLowerCase());

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.commons.i18n.PipeMessages;
2424
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
2525
import org.apache.iotdb.commons.pipe.config.PipeConfig;
26+
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
2627

2728
import com.github.luben.zstd.Zstd;
2829

@@ -69,6 +70,27 @@ public class PipeSinkConstant {
6970
public static final String SINK_REALTIME_FIRST_KEY = "sink.realtime-first";
7071
public static final boolean CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE = true;
7172

73+
public static final String CONNECTOR_SERIALIZE_BY_REGION_KEY = "connector.serialize-by-region";
74+
public static final String SINK_SERIALIZE_BY_REGION_KEY = "sink.serialize-by-region";
75+
public static final boolean CONNECTOR_SERIALIZE_BY_REGION_DEFAULT_VALUE = true;
76+
77+
public static boolean isSerializeByRegionEnabled(final PipeParameters parameters) {
78+
return parameters.getBooleanOrDefault(
79+
Arrays.asList(CONNECTOR_SERIALIZE_BY_REGION_KEY, SINK_SERIALIZE_BY_REGION_KEY),
80+
CONNECTOR_SERIALIZE_BY_REGION_DEFAULT_VALUE);
81+
}
82+
83+
public static String getConnectorOrSinkNameWithDefault(final PipeParameters parameters) {
84+
return parameters.getStringOrDefault(
85+
Arrays.asList(CONNECTOR_KEY, SINK_KEY), getDefaultConnectorOrSinkName(parameters));
86+
}
87+
88+
private static String getDefaultConnectorOrSinkName(final PipeParameters parameters) {
89+
return isSerializeByRegionEnabled(parameters)
90+
? BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName()
91+
: BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName();
92+
}
93+
7294
public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY = "connector.batch.enable";
7395
public static final String SINK_IOTDB_BATCH_MODE_ENABLE_KEY = "sink.batch.enable";
7496
public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE = true;

0 commit comments

Comments
 (0)