Skip to content

Commit c40805f

Browse files
committed
Merge remote-tracking branch 'origin/master' into codex/generic-ssl-config
# Conflicts: # iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/RpcUtilsTest.java
2 parents 2cde545 + 1df1913 commit c40805f

38 files changed

Lines changed: 1365 additions & 190 deletions

File tree

iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -151,16 +151,16 @@ public static void verifySuccessWithRedirectionForMultiDevices(
151151
}
152152

153153
public static void verifySuccess(List<TSStatus> statuses) throws BatchExecutionException {
154-
StringBuilder errMsgs =
155-
new StringBuilder().append(TSStatusCode.MULTIPLE_ERROR.getStatusCode()).append(": ");
154+
StringBuilder errMsgs = new StringBuilder();
156155
for (TSStatus status : statuses) {
157156
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
158157
&& status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
159158
errMsgs.append(status.getMessage()).append("; ");
160159
}
161160
}
162161
if (errMsgs.length() > 0) {
163-
throw new BatchExecutionException(statuses, errMsgs.toString());
162+
throw new BatchExecutionException(
163+
statuses, TSStatusCode.MULTIPLE_ERROR.getStatusCode() + ": " + errMsgs);
164164
}
165165
}
166166

@@ -181,9 +181,9 @@ public static TSStatus getStatus(List<TSStatus> statusList) {
181181
for (TSStatus subStatus : statusList) {
182182
if (subStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
183183
&& subStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
184-
if (!msgSet.contains(status)) {
185-
errMsg.append(status).append("; ");
186-
msgSet.add(status);
184+
if (!msgSet.contains(subStatus)) {
185+
errMsg.append(subStatus).append("; ");
186+
msgSet.add(subStatus);
187187
}
188188
}
189189
}

iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/RpcUtilsTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@
1919

2020
package org.apache.iotdb.rpc;
2121

22+
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23+
2224
import org.junit.Assert;
2325
import org.junit.Test;
2426

2527
import java.time.ZoneId;
2628
import java.time.format.DateTimeFormatter;
29+
import java.util.Arrays;
30+
import java.util.Collections;
2731

2832
public class RpcUtilsTest {
2933

@@ -75,6 +79,27 @@ public void testIsSetSqlDialect() {
7579
Assert.assertFalse(RpcUtils.isSetSqlDialect("set sql_dia"));
7680
}
7781

82+
@Test
83+
public void testVerifySuccessListAllowsSuccessfulStatuses() throws BatchExecutionException {
84+
RpcUtils.verifySuccess(
85+
Arrays.asList(
86+
RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
87+
RpcUtils.getStatus(TSStatusCode.REDIRECTION_RECOMMEND)));
88+
}
89+
90+
@Test
91+
public void testVerifySuccessListThrowsOnFailure() {
92+
TSStatus failedStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "failed");
93+
94+
try {
95+
RpcUtils.verifySuccess(Collections.singletonList(failedStatus));
96+
Assert.fail("Expected BatchExecutionException");
97+
} catch (BatchExecutionException e) {
98+
Assert.assertEquals(Collections.singletonList(failedStatus), e.getStatusList());
99+
Assert.assertTrue(e.getMessage().contains("failed"));
100+
}
101+
}
102+
78103
@Test
79104
public void testStandardTlsProtocol() {
80105
Assert.assertEquals("TLS", RpcSslUtils.normalizeStandardTlsProtocol(null));

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ public void getAINodeHeartBeat(
5454
client = clientManager.borrowClient(endPoint);
5555
client.getAIHeartbeat(req, handler);
5656
dispatched = true;
57-
} catch (Exception ignore) {
58-
// Just ignore
57+
} catch (Exception e) {
58+
handleError(handler, e);
5959
} finally {
6060
// After the async call is dispatched, the client's onComplete/onError callback is
6161
// responsible for returning the client. If the RPC was not dispatched (exception
@@ -67,6 +67,14 @@ public void getAINodeHeartBeat(
6767
}
6868
}
6969

70+
private void handleError(final AINodeHeartbeatHandler handler, final Exception e) {
71+
try {
72+
handler.onError(e);
73+
} catch (final Exception ignore) {
74+
// Ignore handler failures in heartbeat best-effort path.
75+
}
76+
}
77+
7078
private static class AsyncAINodeHeartbeatClientPoolHolder {
7179

7280
private static final AsyncAINodeHeartbeatClientPool INSTANCE =

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ public void getConfigNodeHeartBeat(
5555
client = clientManager.borrowClient(endPoint);
5656
client.getConfigNodeHeartBeat(heartbeatReq, handler);
5757
dispatched = true;
58-
} catch (Exception ignore) {
59-
// Just ignore
58+
} catch (Exception e) {
59+
handleError(handler, e);
6060
} finally {
6161
// After the async call is dispatched, the client's onComplete/onError callback is
6262
// responsible for returning the client. If the RPC was not dispatched (exception
@@ -68,6 +68,14 @@ public void getConfigNodeHeartBeat(
6868
}
6969
}
7070

71+
private void handleError(final ConfigNodeHeartbeatHandler handler, final Exception e) {
72+
try {
73+
handler.onError(e);
74+
} catch (final Exception ignore) {
75+
// Ignore handler failures in heartbeat best-effort path.
76+
}
77+
}
78+
7179
private static class AsyncConfigNodeHeartbeatClientPoolHolder {
7280

7381
private static final AsyncConfigNodeHeartbeatClientPool INSTANCE =

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ public void getDataNodeHeartBeat(
5656
client = clientManager.borrowClient(endPoint);
5757
client.getDataNodeHeartBeat(req, handler);
5858
dispatched = true;
59-
} catch (Exception ignore) {
60-
// Just ignore
59+
} catch (Exception e) {
60+
handleError(handler, e);
6161
} finally {
6262
returnClientIfNotDispatched(endPoint, client, dispatched);
6363
}
@@ -72,7 +72,7 @@ public void writeAuditLog(
7272
client.writeAuditLog(req, handler);
7373
dispatched = true;
7474
} catch (Exception e) {
75-
// Just ignore
75+
handleError(handler, e);
7676
} finally {
7777
returnClientIfNotDispatched(endPoint, client, dispatched);
7878
}
@@ -89,6 +89,22 @@ private void returnClientIfNotDispatched(
8989
}
9090
}
9191

92+
private void handleError(final DataNodeHeartbeatHandler handler, final Exception e) {
93+
try {
94+
handler.onError(e);
95+
} catch (final Exception ignore) {
96+
// Ignore handler failures in heartbeat best-effort path.
97+
}
98+
}
99+
100+
private void handleError(final DataNodeWriteAuditLogHandler handler, final Exception e) {
101+
try {
102+
handler.onError(e);
103+
} catch (final Exception ignore) {
104+
// Ignore handler failures in audit-log best-effort path.
105+
}
106+
}
107+
92108
private static class AsyncDataNodeHeartbeatClientPoolHolder {
93109

94110
private static final AsyncDataNodeHeartbeatClientPool INSTANCE =

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CheckTimeSeriesExistenceRPCHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,11 @@ public void onError(Exception e) {
8383
+ e.getMessage();
8484
LOGGER.error(errorMsg);
8585

86-
countDownLatch.countDown();
8786
TCheckTimeSeriesExistenceResp resp = new TCheckTimeSeriesExistenceResp();
8887
resp.setStatus(
8988
new TSStatus(
9089
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg)));
9190
responseMap.put(requestId, resp);
91+
countDownLatch.countDown();
9292
}
9393
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CountPathsUsingTemplateRPCHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,11 @@ public void onError(Exception e) {
8383
+ e.getMessage();
8484
LOGGER.error(errorMsg);
8585

86-
countDownLatch.countDown();
8786
TCountPathsUsingTemplateResp resp = new TCountPathsUsingTemplateResp();
8887
resp.setStatus(
8988
new TSStatus(
9089
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg)));
9190
responseMap.put(requestId, resp);
91+
countDownLatch.countDown();
9292
}
9393
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,11 @@ public void onError(Exception e) {
8383
+ e.getMessage();
8484
LOGGER.error(errorMsg);
8585

86-
countDownLatch.countDown();
8786
TFetchSchemaBlackListResp resp = new TFetchSchemaBlackListResp();
8887
resp.setStatus(
8988
new TSStatus(
9089
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg)));
9190
responseMap.put(requestId, resp);
91+
countDownLatch.countDown();
9292
}
9393
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/SchemaUpdateRPCHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,10 @@ public void onError(Exception e) {
7777
+ e.getMessage();
7878
LOGGER.warn(errorMsg);
7979

80-
countDownLatch.countDown();
8180
responseMap.put(
8281
requestId,
8382
new TSStatus(
8483
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg)));
84+
countDownLatch.countDown();
8585
}
8686
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/CheckSchemaRegionUsingTemplateRPCHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,11 @@ public void onError(Exception e) {
8585
+ e.getMessage();
8686
LOGGER.error(errorMsg);
8787

88-
countDownLatch.countDown();
8988
TCheckSchemaRegionUsingTemplateResp resp = new TCheckSchemaRegionUsingTemplateResp();
9089
resp.setStatus(
9190
new TSStatus(
9291
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg)));
9392
responseMap.put(requestId, resp);
93+
countDownLatch.countDown();
9494
}
9595
}

0 commit comments

Comments
 (0)