Skip to content

Commit 833048f

Browse files
committed
Fix pipe tree database creation on receiver
1 parent ef0d9f8 commit 833048f

4 files changed

Lines changed: 115 additions & 8 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
9090
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
9191
import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor;
92+
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.DatabaseSchemaTask;
9293
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.CreateDBTask;
9394
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
9495
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.view.AlterLogicalViewNode;
@@ -133,6 +134,7 @@
133134
import java.util.Objects;
134135
import java.util.Optional;
135136
import java.util.Set;
137+
import java.util.concurrent.ConcurrentHashMap;
136138
import java.util.concurrent.ExecutionException;
137139
import java.util.concurrent.atomic.AtomicLong;
138140
import java.util.concurrent.atomic.AtomicReference;
@@ -166,7 +168,8 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver {
166168
this::executeStatementForTableModel);
167169
private final PipeTreeStatementDataTypeConvertExecutionVisitor
168170
treeStatementDataTypeConvertExecutionVisitor =
169-
new PipeTreeStatementDataTypeConvertExecutionVisitor(this::executeStatementForTreeModel);
171+
new PipeTreeStatementDataTypeConvertExecutionVisitor(
172+
statement -> executeStatementForTreeModel(statement, getTreeDatabaseName(statement)));
170173
public final PipeTreeStatementToBatchVisitor batchVisitor = new PipeTreeStatementToBatchVisitor();
171174

172175
// Used for data transfer: confignode (cluster A) -> datanode (cluster B) -> confignode (cluster
@@ -186,6 +189,7 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver {
186189
private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
187190

188191
private PipeMemoryBlock allocatedMemoryBlock;
192+
private final Set<String> autoCreatedTreeDatabases = ConcurrentHashMap.newKeySet();
189193

190194
static {
191195
try {
@@ -988,6 +992,9 @@ private TSStatus executeStatementWithPermissionCheckAndRetryOnDataTypeMismatch(
988992
((InsertBaseStatement) statement).getDatabaseName().isPresent()
989993
? ((InsertBaseStatement) statement).getDatabaseName().get()
990994
: null;
995+
} else if (statement instanceof InsertBaseStatement) {
996+
isTableModelStatement = false;
997+
databaseName = getTreeDatabaseName(statement);
991998
} else {
992999
isTableModelStatement = false;
9931000
databaseName = null;
@@ -1038,7 +1045,7 @@ private TSStatus executeStatementWithPermissionCheckAndRetryOnDataTypeMismatch(
10381045
final TSStatus status =
10391046
isTableModelStatement
10401047
? executeStatementForTableModel(statement, databaseName)
1041-
: executeStatementForTreeModel(statement);
1048+
: executeStatementForTreeModel(statement, databaseName);
10421049

10431050
// Try to convert data type if the status code is not success. Insert statements normally return
10441051
// above after the first converted execution. The retry path is kept for load and fallback
@@ -1181,7 +1188,56 @@ private void autoCreateDatabaseIfNecessary(final String database) {
11811188
}
11821189
}
11831190

1184-
private TSStatus executeStatementForTreeModel(final Statement statement) {
1191+
private void autoCreateTreeDatabaseIfNecessary(final String database) {
1192+
if (database == null
1193+
|| LoadTsFileStatement.getDatabaseLevelByTreeDatabase(database) == null
1194+
|| autoCreatedTreeDatabases.contains(database)
1195+
|| !IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
1196+
return;
1197+
}
1198+
1199+
try {
1200+
final TSStatus status =
1201+
AuthorityChecker.getAccessControl()
1202+
.checkCanCreateDatabaseForTree(getUserEntity(), new PartialPath(database));
1203+
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
1204+
throw new PipeException(status.getMessage());
1205+
}
1206+
1207+
final DatabaseSchemaStatement statement =
1208+
new DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE);
1209+
statement.setDatabasePath(new PartialPath(database));
1210+
final DatabaseSchemaTask task = new DatabaseSchemaTask(statement);
1211+
final ListenableFuture<ConfigTaskResult> future =
1212+
task.execute(ClusterConfigTaskExecutor.getInstance());
1213+
final ConfigTaskResult result = future.get();
1214+
final int statusCode = result.getStatusCode().getStatusCode();
1215+
if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()
1216+
|| statusCode == TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
1217+
autoCreatedTreeDatabases.add(database);
1218+
return;
1219+
}
1220+
if (statusCode != TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
1221+
throw new PipeException(
1222+
String.format(
1223+
"Auto create tree database failed: %s, status code: %s",
1224+
database, result.getStatusCode()));
1225+
}
1226+
} catch (final IllegalPathException e) {
1227+
throw new PipeException(String.format("Illegal tree database %s.", database), e);
1228+
} catch (final ExecutionException | InterruptedException e) {
1229+
if (e instanceof InterruptedException) {
1230+
Thread.currentThread().interrupt();
1231+
}
1232+
throw new PipeException(
1233+
DataNodePipeMessages.AUTO_CREATE_DATABASE_FAILED_BECAUSE + e.getMessage());
1234+
}
1235+
}
1236+
1237+
private TSStatus executeStatementForTreeModel(
1238+
final Statement statement, final String databaseName) {
1239+
autoCreateTreeDatabaseIfNecessary(databaseName);
1240+
11851241
return Coordinator.getInstance()
11861242
.executeForTreeModel(
11871243
shouldMarkAsPipeRequest.get() ? new PipeEnrichedStatement(statement) : statement,
@@ -1196,6 +1252,22 @@ private TSStatus executeStatementForTreeModel(final Statement statement) {
11961252
.status;
11971253
}
11981254

1255+
private IAuditEntity getUserEntity() {
1256+
return userEntity != null
1257+
? userEntity
1258+
: AuthorityChecker.createIAuditEntity(username, SESSION_MANAGER.getCurrSession());
1259+
}
1260+
1261+
private String getTreeDatabaseName(final Statement statement) {
1262+
if (statement instanceof LoadTsFileStatement) {
1263+
return ((LoadTsFileStatement) statement).getDatabase();
1264+
}
1265+
if (statement instanceof InsertBaseStatement) {
1266+
return ((InsertBaseStatement) statement).getDatabaseName().orElse(null);
1267+
}
1268+
return null;
1269+
}
1270+
11991271
private TSStatus executeStatementForTableModelWithPermissionCheck(
12001272
final org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Statement statement,
12011273
final String databaseName) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,15 @@ public Optional<TSStatus> visitLoadFile(
105105
new TsFileInsertionEventScanParser(
106106
file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null, true)) {
107107
for (final Pair<Tablet, Boolean> tabletWithIsAligned : parser.toTabletWithIsAligneds()) {
108+
final InsertTabletStatement insertTabletStatement =
109+
PipeTransferTabletRawReq.toTPipeTransferRawReq(
110+
tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight())
111+
.constructStatement();
112+
if (loadTsFileStatement.getDatabase() != null) {
113+
insertTabletStatement.setDatabaseName(loadTsFileStatement.getDatabase());
114+
}
108115
final PipeConvertedInsertTabletStatement statement =
109-
new PipeConvertedInsertTabletStatement(
110-
PipeTransferTabletRawReq.toTPipeTransferRawReq(
111-
tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight())
112-
.constructStatement(),
113-
false);
116+
new PipeConvertedInsertTabletStatement(insertTabletStatement, false);
114117

115118
TSStatus result;
116119
try {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -652,6 +652,7 @@ private LoadTsFileStatement buildRetryTreeLoadStatement(
652652
.setConvertOnTypeMismatch(true);
653653
if (database != null) {
654654
statement.setDatabase(database);
655+
statement.updateDatabaseLevelByTreeDatabase();
655656
}
656657
if (isGeneratedByPipe) {
657658
statement.markIsGeneratedByPipe();

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,17 @@
2828
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
2929
import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
3030
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsFileNode;
31+
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
3132

3233
import org.junit.Assert;
3334
import org.junit.Before;
3435
import org.junit.Test;
3536
import org.mockito.Mock;
3637
import org.mockito.MockitoAnnotations;
3738

39+
import java.io.File;
40+
import java.lang.reflect.Method;
41+
3842
import static org.mockito.Mockito.mock;
3943
import static org.mockito.Mockito.spy;
4044
import static org.mockito.Mockito.when;
@@ -87,4 +91,31 @@ public void testGetPartitionQueryDatabaseForTableModelLoad() {
8791

8892
Assert.assertEquals("test", LoadTsFileScheduler.getPartitionQueryDatabase(node, false));
8993
}
94+
95+
@Test
96+
public void testBuildRetryTreeLoadStatementUpdatesDatabaseLevel() throws Exception {
97+
final LoadTsFileScheduler scheduler =
98+
new LoadTsFileScheduler(
99+
distributedQueryPlan,
100+
mock(MPPQueryContext.class),
101+
mock(QueryStateMachine.class),
102+
mock(IClientManager.class),
103+
mock(IPartitionFetcher.class),
104+
true);
105+
final Method method =
106+
LoadTsFileScheduler.class.getDeclaredMethod(
107+
"buildRetryTreeLoadStatement", String.class, boolean.class, String.class);
108+
method.setAccessible(true);
109+
110+
final File tsFile = File.createTempFile("test", ".tsfile");
111+
tsFile.deleteOnExit();
112+
113+
final LoadTsFileStatement statement =
114+
(LoadTsFileStatement)
115+
method.invoke(scheduler, tsFile.getAbsolutePath(), true, "root.test.sg_0");
116+
117+
Assert.assertEquals("root.test.sg_0", statement.getDatabase());
118+
Assert.assertEquals(2, statement.getDatabaseLevel());
119+
Assert.assertTrue(statement.isGeneratedByPipe());
120+
}
90121
}

0 commit comments

Comments
 (0)