Skip to content

Commit d583132

Browse files
authored
Fix compaction writer size checkpointing (#17941)
1 parent 3ebc264 commit d583132

8 files changed

Lines changed: 343 additions & 20 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java

Lines changed: 154 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,16 @@
2424
import org.apache.iotdb.db.i18n.StorageEngineMessages;
2525
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionLastTimeCheckFailedException;
2626
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
27+
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.FollowingBatchCompactionAlignedChunkWriter;
2728
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.AlignedPageElement;
2829
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.ChunkMetadataElement;
2930
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.flushcontroller.AbstractCompactionFlushController;
3031
import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter;
3132
import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
3233

34+
import org.apache.tsfile.block.column.Column;
3335
import org.apache.tsfile.encrypt.EncryptParameter;
36+
import org.apache.tsfile.enums.TSDataType;
3437
import org.apache.tsfile.exception.write.PageException;
3538
import org.apache.tsfile.file.header.PageHeader;
3639
import org.apache.tsfile.file.metadata.ChunkMetadata;
@@ -66,6 +69,13 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
6669
// The index of the array corresponds to subTaskId.
6770
protected int[] chunkPointNumArray = new int[subTaskNum];
6871

72+
// Each sub task has estimated total size of written points in current chunk.
73+
// The index of the array corresponds to subTaskId.
74+
protected long[] writtenPointTotalSizeArray = new long[subTaskNum];
75+
76+
// Whether each sub task's current chunk writer contains TEXT, STRING, BLOB or OBJECT.
77+
protected boolean[] hasVariableLengthTypeArray = new boolean[subTaskNum];
78+
6979
// used to control the target chunk size
7080
protected long targetChunkSize = IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
7181

@@ -77,7 +87,12 @@ public abstract class AbstractCompactionWriter implements AutoCloseable {
7787
@SuppressWarnings("squid:S1170")
7888
private final long checkPoint = (targetChunkPointNum >= 10 ? targetChunkPointNum : 10) / 10;
7989

80-
private long lastCheckIndex = 0;
90+
private final long[] lastCheckIndexArray = new long[subTaskNum];
91+
92+
// When estimated size of written points reaches check point, then check chunk size.
93+
private final long writtenPointTotalSizeCheckPoint = Math.max(targetChunkSize / 10, 1L);
94+
95+
private final long[] lastWrittenPointTotalSizeCheckIndexArray = new long[subTaskNum];
8196

8297
// if unsealed chunk size is lower then this, then deserialize next chunk no matter it is
8398
// overlapped or not
@@ -122,10 +137,24 @@ public ModEntry getTTLLowerBoundForCurrentDevice() {
122137
}
123138

124139
public void startMeasurement(String measurement, IChunkWriter chunkWriter, int subTaskId) {
125-
lastCheckIndex = 0;
140+
resetChunkWriterStatistics(subTaskId);
126141
lastTimeSet[subTaskId] = false;
127142
chunkWriters[subTaskId] = chunkWriter;
128143
measurementId[subTaskId] = measurement;
144+
hasVariableLengthTypeArray[subTaskId] = containsVariableLengthType(chunkWriter);
145+
}
146+
147+
private boolean containsVariableLengthType(IChunkWriter chunkWriter) {
148+
if (chunkWriter instanceof ChunkWriterImpl) {
149+
return ((ChunkWriterImpl) chunkWriter).getDataType().isBinary();
150+
}
151+
AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriter;
152+
for (ValueChunkWriter valueChunkWriter : alignedChunkWriter.getValueChunkWriterList()) {
153+
if (valueChunkWriter.getDataType().isBinary()) {
154+
return true;
155+
}
156+
}
157+
return false;
129158
}
130159

131160
public abstract void endMeasurement(int subTaskId) throws IOException;
@@ -146,7 +175,9 @@ public void startMeasurement(String measurement, IChunkWriter chunkWriter, int s
146175
*/
147176
public abstract void checkAndMayFlushChunkMetadata() throws IOException;
148177

149-
protected void writeDataPoint(long timestamp, TsPrimitiveType value, IChunkWriter chunkWriter) {
178+
protected void writeDataPoint(
179+
long timestamp, TsPrimitiveType value, IChunkWriter chunkWriter, int subTaskId) {
180+
long writtenPointTotalSize = 0;
150181
if (chunkWriter instanceof ChunkWriterImpl) {
151182
ChunkWriterImpl chunkWriterImpl = (ChunkWriterImpl) chunkWriter;
152183
switch (chunkWriterImpl.getDataType()) {
@@ -155,6 +186,7 @@ protected void writeDataPoint(long timestamp, TsPrimitiveType value, IChunkWrite
155186
case BLOB:
156187
case OBJECT:
157188
chunkWriterImpl.write(timestamp, value.getBinary());
189+
writtenPointTotalSize += value.getBinary().getLength();
158190
break;
159191
case DOUBLE:
160192
chunkWriterImpl.write(timestamp, value.getDouble());
@@ -180,17 +212,103 @@ protected void writeDataPoint(long timestamp, TsPrimitiveType value, IChunkWrite
180212
} else {
181213
AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriter;
182214
alignedChunkWriter.write(timestamp, value.getVector());
215+
if (hasVariableLengthTypeArray[subTaskId]) {
216+
writtenPointTotalSize = estimateWrittenPointTotalSize(value);
217+
}
218+
}
219+
chunkPointNumArray[subTaskId]++;
220+
if (hasVariableLengthTypeArray[subTaskId]) {
221+
writtenPointTotalSizeArray[subTaskId] += writtenPointTotalSize;
183222
}
184223
}
185224

225+
private long estimateWrittenPointTotalSize(TsPrimitiveType value) {
226+
long size = Long.BYTES;
227+
TsPrimitiveType[] vector = value.getVector();
228+
for (TsPrimitiveType tsPrimitiveType : vector) {
229+
if (tsPrimitiveType == null) {
230+
continue;
231+
}
232+
TSDataType dataType = tsPrimitiveType.getDataType();
233+
switch (dataType) {
234+
case TEXT:
235+
case STRING:
236+
case BLOB:
237+
case OBJECT:
238+
size += tsPrimitiveType.getBinary().getLength();
239+
break;
240+
case DOUBLE:
241+
case INT64:
242+
case TIMESTAMP:
243+
size += Long.BYTES;
244+
break;
245+
case INT32:
246+
case DATE:
247+
case FLOAT:
248+
size += Integer.BYTES;
249+
break;
250+
case BOOLEAN:
251+
size += 1;
252+
break;
253+
default:
254+
break;
255+
}
256+
}
257+
return size;
258+
}
259+
260+
protected long estimateWrittenPointTotalSize(TsBlock tsBlock) {
261+
int pointCount = tsBlock.getPositionCount();
262+
long size = (long) Long.BYTES * pointCount;
263+
Column[] columns = tsBlock.getValueColumns();
264+
for (Column column : columns) {
265+
TSDataType dataType = column.getDataType();
266+
if (dataType.isBinary()) {
267+
for (int j = 0; j < pointCount; j++) {
268+
if (column.isNull(j)) {
269+
continue;
270+
}
271+
size += column.getBinary(j).getLength();
272+
}
273+
continue;
274+
}
275+
// This is only used as a checkpoint estimate, so fixed-width values use count directly.
276+
switch (dataType) {
277+
case DOUBLE:
278+
case INT64:
279+
case TIMESTAMP:
280+
size += (long) Long.BYTES * pointCount;
281+
break;
282+
case INT32:
283+
case DATE:
284+
case FLOAT:
285+
size += (long) Integer.BYTES * pointCount;
286+
break;
287+
case BOOLEAN:
288+
size += pointCount;
289+
break;
290+
default:
291+
break;
292+
}
293+
}
294+
return size;
295+
}
296+
186297
@SuppressWarnings("squid:S2445")
187298
protected void sealChunk(
188299
CompactionTsFileWriter targetWriter, IChunkWriter chunkWriter, int subTaskId)
189300
throws IOException {
190301
synchronized (targetWriter) {
191302
targetWriter.writeChunk(chunkWriter);
192303
}
304+
resetChunkWriterStatistics(subTaskId);
305+
}
306+
307+
private void resetChunkWriterStatistics(int subTaskId) {
193308
chunkPointNumArray[subTaskId] = 0;
309+
writtenPointTotalSizeArray[subTaskId] = 0;
310+
lastCheckIndexArray[subTaskId] = 0;
311+
lastWrittenPointTotalSizeCheckIndexArray[subTaskId] = 0;
194312
}
195313

196314
public abstract EncryptParameter getEncryptParameter();
@@ -214,7 +332,7 @@ protected void flushNonAlignedChunkToFileWriter(
214332
synchronized (targetWriter) {
215333
// seal last chunk to file writer
216334
targetWriter.writeChunk(chunkWriters[subTaskId]);
217-
chunkPointNumArray[subTaskId] = 0;
335+
resetChunkWriterStatistics(subTaskId);
218336
targetWriter.writeChunk(chunk, chunkMetadata);
219337
}
220338
}
@@ -232,7 +350,7 @@ protected void flushAlignedChunkToFileWriter(
232350
AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriters[subTaskId];
233351
// seal last chunk to file writer
234352
targetWriter.writeChunk(alignedChunkWriter);
235-
chunkPointNumArray[subTaskId] = 0;
353+
resetChunkWriterStatistics(subTaskId);
236354

237355
targetWriter.markStartingWritingAligned();
238356

@@ -279,6 +397,9 @@ protected void flushNonAlignedPageToChunkWriter(
279397
chunkWriter.writePageHeaderAndDataIntoBuff(compressedPageData, pageHeader);
280398

281399
chunkPointNumArray[subTaskId] += pageHeader.getStatistics().getCount();
400+
if (hasVariableLengthTypeArray[subTaskId]) {
401+
writtenPointTotalSizeArray[subTaskId] += pageHeader.getSerializedPageSize();
402+
}
282403
}
283404

284405
public abstract boolean flushAlignedPage(AlignedPageElement alignedPageElement, int subTaskId)
@@ -303,29 +424,51 @@ protected void flushAlignedPageToChunkWriter(
303424
// flush new time page to chunk writer directly
304425
alignedChunkWriter.writePageHeaderAndDataIntoTimeBuff(compressedTimePageData, timePageHeader);
305426

427+
long writtenValuePageSize = 0;
306428
// flush new value pages to chunk writer directly
307429
for (int i = 0; i < valuePageHeaders.size(); i++) {
308-
if (valuePageHeaders.get(i) == null) {
430+
PageHeader valuePageHeader = valuePageHeaders.get(i);
431+
if (valuePageHeader == null) {
309432
// sub sensor does not exist in current file or value page has been deleted completely
310433
alignedChunkWriter.getValueChunkWriterByIndex(i).writeEmptyPageToPageBuffer();
311434
continue;
312435
}
313436
alignedChunkWriter.writePageHeaderAndDataIntoValueBuff(
314-
compressedValuePageDatas.get(i), valuePageHeaders.get(i), i);
437+
compressedValuePageDatas.get(i), valuePageHeader, i);
438+
if (hasVariableLengthTypeArray[subTaskId]) {
439+
writtenValuePageSize += valuePageHeader.getSerializedPageSize();
440+
}
315441
}
316442

317443
chunkPointNumArray[subTaskId] += timePageHeader.getStatistics().getCount();
444+
if (hasVariableLengthTypeArray[subTaskId]) {
445+
// Direct-flushed pages are already serialized, so use page size as checkpoint estimate.
446+
writtenPointTotalSizeArray[subTaskId] +=
447+
timePageHeader.getSerializedPageSize() + writtenValuePageSize;
448+
}
318449
}
319450

320451
protected void checkChunkSizeAndMayOpenANewChunk(
321452
CompactionTsFileWriter fileWriter, IChunkWriter chunkWriter, int subTaskId)
322453
throws IOException {
323-
if (chunkPointNumArray[subTaskId] >= (lastCheckIndex + 1) * checkPoint) {
324-
// if chunk point num reaches the check point, then check if the chunk size over threshold
325-
lastCheckIndex = chunkPointNumArray[subTaskId] / checkPoint;
454+
if (chunkWriter instanceof FollowingBatchCompactionAlignedChunkWriter
455+
&& chunkWriter.checkIsChunkSizeOverThreshold(targetChunkSize, targetChunkPointNum, false)) {
456+
sealChunk(fileWriter, chunkWriter, subTaskId);
457+
return;
458+
}
459+
boolean reachesPointCheckPoint =
460+
chunkPointNumArray[subTaskId] >= (lastCheckIndexArray[subTaskId] + 1) * checkPoint;
461+
boolean reachesSizeCheckPoint =
462+
hasVariableLengthTypeArray[subTaskId]
463+
&& writtenPointTotalSizeArray[subTaskId]
464+
>= (lastWrittenPointTotalSizeCheckIndexArray[subTaskId] + 1)
465+
* writtenPointTotalSizeCheckPoint;
466+
if (reachesPointCheckPoint || reachesSizeCheckPoint) {
467+
lastCheckIndexArray[subTaskId] = chunkPointNumArray[subTaskId] / checkPoint;
468+
lastWrittenPointTotalSizeCheckIndexArray[subTaskId] =
469+
writtenPointTotalSizeArray[subTaskId] / writtenPointTotalSizeCheckPoint;
326470
if (chunkWriter.checkIsChunkSizeOverThreshold(targetChunkSize, targetChunkPointNum, false)) {
327471
sealChunk(fileWriter, chunkWriter, subTaskId);
328-
lastCheckIndex = 0;
329472
}
330473
}
331474
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,7 @@ public void write(TimeValuePair timeValuePair, int subTaskId) throws IOException
150150

151151
checkTimeAndMayFlushChunkToCurrentFile(timestamp, subTaskId);
152152
int fileIndex = seqFileIndexArray[subTaskId];
153-
writeDataPoint(timestamp, value, chunkWriters[subTaskId]);
154-
chunkPointNumArray[subTaskId]++;
153+
writeDataPoint(timestamp, value, chunkWriters[subTaskId], subTaskId);
155154
checkChunkSizeAndMayOpenANewChunk(
156155
targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId);
157156
isDeviceExistedInTargetFiles[fileIndex] = true;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,8 @@ public void endMeasurement(int subTaskId) throws IOException {
142142
@Override
143143
public void write(TimeValuePair timeValuePair, int subTaskId) throws IOException {
144144
checkPreviousTimestamp(timeValuePair.getTimestamp(), subTaskId);
145-
writeDataPoint(timeValuePair.getTimestamp(), timeValuePair.getValue(), chunkWriters[subTaskId]);
146-
chunkPointNumArray[subTaskId]++;
145+
writeDataPoint(
146+
timeValuePair.getTimestamp(), timeValuePair.getValue(), chunkWriters[subTaskId], subTaskId);
147147
checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], subTaskId);
148148
lastTime[subTaskId] = timeValuePair.getTimestamp();
149149
lastTimeSet[subTaskId] = true;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/FastInnerCompactionWriter.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,8 @@ public boolean flushAlignedPage(AlignedPageElement alignedPageElement, int subTa
199199
valuePageHeaders,
200200
subTaskId);
201201

202+
checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], subTaskId);
203+
202204
lastTime[subTaskId] = timePageHeader.getEndTime();
203205
lastTimeSet[subTaskId] = true;
204206
return true;
@@ -235,6 +237,8 @@ public boolean flushBatchedValuePage(
235237
valuePageHeaders,
236238
subTaskId);
237239

240+
checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], subTaskId);
241+
238242
lastTime[subTaskId] = timePageHeader.getEndTime();
239243
lastTimeSet[subTaskId] = true;
240244
return true;
@@ -245,10 +249,12 @@ public boolean flushBatchedValuePage(
245249
* successfully or not. Return false if the unsealed page is too small or the end time of page
246250
* exceeds the end time of file, else return true.
247251
*
252+
* @throws IOException if io errors occurred
248253
* @throws PageException if errors occurred when write data page header
249254
*/
250255
public boolean flushNonAlignedPage(
251-
ByteBuffer compressedPageData, PageHeader pageHeader, int subTaskId) throws PageException {
256+
ByteBuffer compressedPageData, PageHeader pageHeader, int subTaskId)
257+
throws IOException, PageException {
252258
checkPreviousTimestamp(pageHeader.getStartTime(), subTaskId);
253259
boolean isUnsealedPageOverThreshold =
254260
chunkWriters[subTaskId].checkIsUnsealedPageOverThreshold(
@@ -261,6 +267,8 @@ public boolean flushNonAlignedPage(
261267
flushNonAlignedPageToChunkWriter(
262268
(ChunkWriterImpl) chunkWriters[subTaskId], compressedPageData, pageHeader, subTaskId);
263269

270+
checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], subTaskId);
271+
264272
lastTime[subTaskId] = pageHeader.getEndTime();
265273
lastTimeSet[subTaskId] = true;
266274
return true;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,16 @@ public void write(TsBlock tsBlock, int subTaskId) throws IOException {
6868
checkTimeAndMayFlushChunkToCurrentFile(timestamps.getStartTime(), subTaskId);
6969
AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) this.chunkWriters[subTaskId];
7070
chunkWriter.write(timestamps, columns, batchSize);
71+
chunkPointNumArray[subTaskId] += batchSize;
72+
if (hasVariableLengthTypeArray[subTaskId]) {
73+
writtenPointTotalSizeArray[subTaskId] += estimateWrittenPointTotalSize(tsBlock);
74+
}
7175
synchronized (this) {
7276
// we need to synchronized here to avoid multi-thread competition in sub-task
7377
TsFileResource resource = targetResources.get(seqFileIndexArray[subTaskId]);
7478
resource.updateStartTime(deviceId, timestamps.getStartTime());
7579
resource.updateEndTime(deviceId, timestamps.getEndTime());
7680
}
77-
chunkPointNumArray[subTaskId] += timestamps.getTimes().length;
7881
checkChunkSizeAndMayOpenANewChunk(
7982
targetFileWriters.get(seqFileIndexArray[subTaskId]), chunkWriter, subTaskId);
8083
isDeviceExistedInTargetFiles[seqFileIndexArray[subTaskId]] = true;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,10 @@ public void write(TsBlock tsBlock, int subTaskId) throws IOException {
7171
int batchSize = tsBlock.getPositionCount();
7272
AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) this.chunkWriters[subTaskId];
7373
chunkWriter.write(timestamps, columns, batchSize);
74-
chunkPointNumArray[subTaskId] += timestamps.getTimes().length;
74+
chunkPointNumArray[subTaskId] += batchSize;
75+
if (hasVariableLengthTypeArray[subTaskId]) {
76+
writtenPointTotalSizeArray[subTaskId] += estimateWrittenPointTotalSize(tsBlock);
77+
}
7578
checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriter, subTaskId);
7679
}
7780

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/RepairUnsortedFileCompactionWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ public void endMeasurement(int subTaskId) throws IOException {
8484
}
8585

8686
private void writeToChunkWriter(TimeValuePair timeValuePair, int subTaskId) throws IOException {
87-
writeDataPoint(timeValuePair.getTimestamp(), timeValuePair.getValue(), chunkWriters[subTaskId]);
88-
chunkPointNumArray[subTaskId]++;
87+
writeDataPoint(
88+
timeValuePair.getTimestamp(), timeValuePair.getValue(), chunkWriters[subTaskId], subTaskId);
8989
checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], subTaskId);
9090
}
9191

0 commit comments

Comments
 (0)