Skip to content

Commit aaa5729

Browse files
authored
Fix PBTree flush for negative child address (#17955) (#17978)
* Fix PBTree flush for negative child address * Address PBTree flush review comments (cherry picked from commit 8d56ae7)
1 parent d1df104 commit aaa5729

3 files changed

Lines changed: 80 additions & 31 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java

Lines changed: 40 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ public Scheduler(
7676
this.releaseFlushStrategy = releaseFlushStrategy;
7777
}
7878

79-
private void executeFlush(CachedMTreeStore store, int regionId, AtomicInteger remainToFlush) {
79+
private void executeFlush(
80+
CachedMTreeStore store, int regionId, AtomicInteger remainToFlush, boolean propagateFailure) {
8081
IMemoryManager memoryManager = store.getMemoryManager();
8182
ISchemaFile file = store.getSchemaFile();
8283
LockManager lockManager = store.getLockManager();
@@ -97,6 +98,9 @@ private void executeFlush(CachedMTreeStore store, int regionId, AtomicInteger re
9798
regionId,
9899
e.getMessage(),
99100
e);
101+
if (propagateFailure) {
102+
throw new RuntimeException(e);
103+
}
100104
} finally {
101105
long time = System.currentTimeMillis() - startTime;
102106
if (time > 10_000) {
@@ -145,22 +149,26 @@ public synchronized CompletableFuture<Void> scheduleFlushAll() {
145149
CompletableFuture.runAsync(
146150
() -> {
147151
int regionId = entry.getKey();
148-
CachedMTreeStore store = entry.getValue();
149-
if (store == null) {
150-
// store has been closed
151-
return;
152-
}
153-
LockManager lockManager = store.getLockManager();
154-
lockManager.globalReadLock();
155-
if (!regionToStore.containsKey(regionId)) {
156-
// double check store have not been closed
157-
return;
158-
}
159152
try {
160-
executeFlush(store, regionId, null);
161-
executeRelease(store, false);
153+
CachedMTreeStore store = entry.getValue();
154+
if (store == null) {
155+
// store has been closed
156+
return;
157+
}
158+
LockManager lockManager = store.getLockManager();
159+
lockManager.globalReadLock();
160+
try {
161+
if (!regionToStore.containsKey(regionId)) {
162+
// double check store have not been closed
163+
return;
164+
}
165+
executeFlush(store, regionId, null, true);
166+
executeRelease(store, false);
167+
} finally {
168+
lockManager.globalReadUnlock();
169+
}
162170
} finally {
163-
lockManager.globalReadUnlock();
171+
flushingRegionSet.remove(regionId);
164172
}
165173
},
166174
workerPool))
@@ -221,22 +229,25 @@ public synchronized void scheduleFlush(List<Integer> regionIds) {
221229
flushingRegionSet.add(regionId);
222230
workerPool.submit(
223231
() -> {
224-
CachedMTreeStore store = regionToStore.get(regionId);
225-
if (store == null) {
226-
// store has been closed
227-
return;
228-
}
229-
LockManager lockManager = store.getLockManager();
230-
lockManager.globalReadLock();
231-
if (!regionToStore.containsKey(regionId)) {
232-
// double check store have not been closed
233-
return;
234-
}
235232
try {
236-
237-
executeFlush(store, regionId, remainToFlush);
233+
CachedMTreeStore store = regionToStore.get(regionId);
234+
if (store == null) {
235+
// store has been closed
236+
return;
237+
}
238+
LockManager lockManager = store.getLockManager();
239+
lockManager.globalReadLock();
240+
try {
241+
if (!regionToStore.containsKey(regionId)) {
242+
// double check store have not been closed
243+
return;
244+
}
245+
executeFlush(store, regionId, remainToFlush, false);
246+
} finally {
247+
lockManager.globalReadUnlock();
248+
}
238249
} finally {
239-
lockManager.globalReadUnlock();
250+
flushingRegionSet.remove(regionId);
240251
}
241252
});
242253
if (remainToFlush.get() <= 0) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,8 +292,6 @@ private void writeUpdatedChildren(ICachedMNode node, SchemaPageContext cxt)
292292
.entrySet()) {
293293
child = entry.getValue();
294294
actualAddress = getTargetSegmentAddress(curSegAddr, entry.getKey(), cxt);
295-
childBuffer = RecordUtils.node2Buffer(child);
296-
297295
curPage = getPageInstance(SchemaFile.getPageIndex(actualAddress), cxt);
298296
if (curPage.getAsSegmentedPage().read(SchemaFile.getSegIndex(actualAddress), entry.getKey())
299297
== null) {
@@ -302,6 +300,13 @@ private void writeUpdatedChildren(ICachedMNode node, SchemaPageContext cxt)
302300
"Node[%s] has no child[%s] in pbtree file.", node.getName(), entry.getKey()));
303301
}
304302

303+
if (!child.isMeasurement() && getNodeAddress(child) < 0) {
304+
short estSegSize = estimateSegmentSize(child);
305+
long glbIndex = preAllocateSegment(estSegSize, cxt);
306+
SchemaFile.setNodeAddress(child, glbIndex);
307+
}
308+
childBuffer = RecordUtils.node2Buffer(child);
309+
305310
// prepare alias comparison
306311
if (child.isMeasurement() && child.getAsMeasurementMNode().getAlias() != null) {
307312
alias = child.getAsMeasurementMNode().getAlias();

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,39 @@ public void test2KMeasurement() throws MetadataException, IOException {
315315
sf.close();
316316
}
317317

318+
@Test
319+
public void testFlushUpdatedChildWithNegativeSegmentAddress() throws Exception {
320+
ISchemaFile sf = SchemaFile.initSchemaFile("root.sg", TEST_SCHEMA_REGION_ID);
321+
ICachedMNode sgNode = nodeFactory.createDatabaseDeviceMNode(null, "sg").getAsMNode();
322+
ICachedMNode device = nodeFactory.createDeviceMNode(sgNode, "d1").getAsMNode();
323+
sgNode.addChild(device);
324+
325+
writeMNodeInTest(sf, sgNode);
326+
327+
// Typical flush order: the parent already has this child record on disk, while the updated
328+
// child object in memory may still have no valid segment address.
329+
ICachedMNodeContainer.getCachedMNodeContainer(device).setSegmentAddress(-1L);
330+
addNodeToUpdateBuffer(sgNode, device);
331+
writeMNodeInTest(sf, sgNode);
332+
333+
Assert.assertTrue(getSegAddrInContainer(device) >= 0);
334+
335+
device.addChild(getMeasurementNode(device, "s1", "alias_s1"));
336+
writeMNodeInTest(sf, device);
337+
Assert.assertEquals(
338+
"alias_s1", sf.getChildNode(device, "s1").getAsMeasurementMNode().getAlias());
339+
340+
long deviceSegmentAddress = getSegAddrInContainer(device);
341+
sf.close();
342+
343+
sf = SchemaFile.loadSchemaFile("root.sg", TEST_SCHEMA_REGION_ID);
344+
ICachedMNode loadedDevice = sf.getChildNode(sgNode, "d1");
345+
Assert.assertEquals(deviceSegmentAddress, getSegAddrInContainer(loadedDevice));
346+
Assert.assertEquals(
347+
"alias_s1", sf.getChildNode(loadedDevice, "s1").getAsMeasurementMNode().getAlias());
348+
sf.close();
349+
}
350+
318351
@Test
319352
public void testMassiveSegment() throws MetadataException, IOException {
320353
ICachedMNode dbNode = nodeFactory.createDatabaseDeviceMNode(null, "sgRoot");

0 commit comments

Comments
 (0)