Skip to content

Commit 3801d49

Browse files
committed
Address pipe compatibility review comments
1 parent 0130fce commit 3801d49

6 files changed

Lines changed: 26 additions & 17 deletions

File tree

iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,9 @@ public final class ConfigNodeMessages {
216216
"Failed to drop trigger [%s], this trigger has not been created";
217217
public static final String FAILED_TO_DROP_UDF_THIS_UDF_HAS_NOT_BEEN_CREATED =
218218
"Failed to drop UDF [%s], this UDF has not been created";
219+
public static final String
220+
FAILED_TO_ENRICH_PIPE_WITH_ROOT_USER_FOR_COMPATIBILITY_BECAUSE_ROOT_USER_DOES_NOT_EXIST =
221+
"Failed to enrich pipe %s with root user for compatibility because root user %s does not exist.";
219222
public static final String FAILED_TO_FETCH_SCHEMAENGINE_BLACK_LIST_ON_DATANODE =
220223
"Failed to fetch schemaengine black list on DataNode {}, {}";
221224
public static final String FAILED_TO_GET_FIELD = "Failed to get field {}";

iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,9 @@ public final class ConfigNodeMessages {
212212
"Failed to drop trigger [%s], this trigger has not been created";
213213
public static final String FAILED_TO_DROP_UDF_THIS_UDF_HAS_NOT_BEEN_CREATED =
214214
"Failed to drop UDF [%s], this UDF has not been created";
215+
public static final String
216+
FAILED_TO_ENRICH_PIPE_WITH_ROOT_USER_FOR_COMPATIBILITY_BECAUSE_ROOT_USER_DOES_NOT_EXIST =
217+
"Failed to enrich pipe %s with root user for compatibility because root user %s does not exist.";
215218
public static final String FAILED_TO_FETCH_SCHEMAENGINE_BLACK_LIST_ON_DATANODE =
216219
"Failed to fetch schemaengine black list on DataNode {}, {}";
217220
public static final String FAILED_TO_GET_FIELD = "Failed to get field {}";

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ public ConfigManager() throws IOException {
376376
CQInfo cqInfo = new CQInfo();
377377
ExternalServiceInfo externalServiceInfo = new ExternalServiceInfo();
378378
this.permissionManager = createPermissionManager(authorInfo);
379-
PipeInfo pipeInfo = new PipeInfo(this.permissionManager::login4Pipe);
379+
PipeInfo pipeInfo = new PipeInfo(userName -> this.permissionManager.login4Pipe(userName, null));
380380
QuotaInfo quotaInfo = new QuotaInfo();
381381
TTLInfo ttlInfo = new TTLInfo();
382382
SubscriptionInfo subscriptionInfo = new SubscriptionInfo();

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
import java.util.List;
5050
import java.util.Objects;
5151
import java.util.Optional;
52-
import java.util.function.BiFunction;
52+
import java.util.function.Function;
5353

5454
public class PipeInfo implements SnapshotProcessor {
5555

@@ -62,10 +62,10 @@ public PipeInfo() throws IOException {
6262
this(null);
6363
}
6464

65-
public PipeInfo(final BiFunction<String, String, String> pipeUserPasswordProvider)
65+
public PipeInfo(final Function<String, String> pipeUserCurrentPasswordProvider)
6666
throws IOException {
6767
pipePluginInfo = new PipePluginInfo();
68-
pipeTaskInfo = new PipeTaskInfo(pipeUserPasswordProvider);
68+
pipeTaskInfo = new PipeTaskInfo(pipeUserCurrentPasswordProvider);
6969
}
7070

7171
public PipePluginInfo getPipePluginInfo() {

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@
8080
import java.util.Optional;
8181
import java.util.concurrent.atomic.AtomicBoolean;
8282
import java.util.concurrent.atomic.AtomicLong;
83-
import java.util.function.BiFunction;
83+
import java.util.function.Function;
8484
import java.util.stream.Collectors;
8585
import java.util.stream.StreamSupport;
8686

@@ -97,16 +97,17 @@ public class PipeTaskInfo implements SnapshotProcessor {
9797

9898
// Pure in-memory object, not involved in snapshot serialization and deserialization.
9999
private final PipeTaskInfoVersion pipeTaskInfoVersion;
100-
private final BiFunction<String, String, String> pipeUserPasswordProvider;
100+
// Accepts a username and returns its current stored password for pipe authentication.
101+
private final Function<String, String> pipeUserCurrentPasswordProvider;
101102

102103
public PipeTaskInfo() {
103104
this(null);
104105
}
105106

106-
public PipeTaskInfo(final BiFunction<String, String, String> pipeUserPasswordProvider) {
107+
public PipeTaskInfo(final Function<String, String> pipeUserCurrentPasswordProvider) {
107108
this.pipeMetaKeeper = new PipeMetaKeeper();
108109
this.pipeTaskInfoVersion = new PipeTaskInfoVersion();
109-
this.pipeUserPasswordProvider = pipeUserPasswordProvider;
110+
this.pipeUserCurrentPasswordProvider = pipeUserCurrentPasswordProvider;
110111
}
111112

112113
/////////////////////////////// Lock ///////////////////////////////
@@ -1022,7 +1023,7 @@ public void enrichPipeMetasWithRootUserForCompatibility() {
10221023
}
10231024

10241025
private void enrichPipeMetaWithRootUserForCompatibility(final PipeStaticMeta pipeStaticMeta) {
1025-
if (pipeUserPasswordProvider == null) {
1026+
if (pipeUserCurrentPasswordProvider == null) {
10261027
return;
10271028
}
10281029
final boolean shouldEnrichSource = pipeStaticMeta.mayNeedCompatibleRootUserForIoTDBSource();
@@ -1032,12 +1033,14 @@ private void enrichPipeMetaWithRootUserForCompatibility(final PipeStaticMeta pip
10321033
}
10331034

10341035
final String rootUserName = CommonDescriptor.getInstance().getConfig().getDefaultAdminName();
1035-
final String password = pipeUserPasswordProvider.apply(rootUserName, null);
1036+
final String password = pipeUserCurrentPasswordProvider.apply(rootUserName);
10361037
if (Objects.isNull(password)) {
10371038
throw new PipeException(
10381039
String.format(
1039-
"Failed to enrich pipe %s with root user for compatibility because root user %s does not exist.",
1040-
pipeStaticMeta.getPipeName(), rootUserName));
1040+
ConfigNodeMessages
1041+
.FAILED_TO_ENRICH_PIPE_WITH_ROOT_USER_FOR_COMPATIBILITY_BECAUSE_ROOT_USER_DOES_NOT_EXIST,
1042+
pipeStaticMeta.getPipeName(),
1043+
rootUserName));
10411044
}
10421045

10431046
if (shouldEnrichSource) {

iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public void testRecordDataNodePushPipeMetaExceptionsKeepsUserStoppedPipeOutOfAut
9696
public void testEnrichOldUserPipeWithRootUserForCompatibility() {
9797
final String rootUserName = CommonDescriptor.getInstance().getConfig().getDefaultAdminName();
9898
final String rootPassword = "root-current-password";
99-
pipeTaskInfo = new PipeTaskInfo((username, password) -> rootPassword);
99+
pipeTaskInfo = new PipeTaskInfo(username -> rootPassword);
100100

101101
createPipe("oldPipe", PipeStatus.STOPPED);
102102

@@ -124,7 +124,7 @@ public void testEnrichOldUserPipeWithRootUserForCompatibility() {
124124

125125
@Test
126126
public void testDoNotOverwritePipeWithUserForCompatibility() {
127-
pipeTaskInfo = new PipeTaskInfo((username, password) -> "root-current-password");
127+
pipeTaskInfo = new PipeTaskInfo(username -> "root-current-password");
128128

129129
createPipeWithSourceAttributes(
130130
"newPipe",
@@ -149,7 +149,7 @@ public void testDoNotOverwritePipeWithUserForCompatibility() {
149149

150150
@Test
151151
public void testDoNotEnrichSystemPipeForCompatibility() {
152-
pipeTaskInfo = new PipeTaskInfo((username, password) -> "root-current-password");
152+
pipeTaskInfo = new PipeTaskInfo(username -> "root-current-password");
153153

154154
createPipeWithSourceAttributes(
155155
PipeStaticMeta.generateSubscriptionPipeName("topic", "group"),
@@ -173,7 +173,7 @@ public void testDoNotEnrichSystemPipeForCompatibility() {
173173
public void testEnrichOldWriteBackSinkWithRootUserForCompatibility() {
174174
final String rootUserName = CommonDescriptor.getInstance().getConfig().getDefaultAdminName();
175175
final String rootPassword = "root-current-password";
176-
pipeTaskInfo = new PipeTaskInfo((username, password) -> rootPassword);
176+
pipeTaskInfo = new PipeTaskInfo(username -> rootPassword);
177177

178178
createPipeWithAttributes(
179179
"oldWriteBackPipe",
@@ -206,7 +206,7 @@ public void testEnrichOldWriteBackSinkWithRootUserForCompatibility() {
206206
@Test
207207
public void testEnrichLoadedPipeMetasWithRootUserForCompatibility() {
208208
final String rootPassword = "root-current-password";
209-
pipeTaskInfo = new PipeTaskInfo((username, password) -> rootPassword);
209+
pipeTaskInfo = new PipeTaskInfo(username -> rootPassword);
210210

211211
createPipeWithSourceAttributes(
212212
"loadedPipe",

0 commit comments

Comments
 (0)