Skip to content

Commit 3b9f59f

Browse files
authored
[Pipe] Fix omitted legacy tablet compatibility follow-ups (#17929)
* Fix 1.3.7 binaryBuffers * Update PipeDataNodeThriftRequestTest.java * Hotfix * spotless
1 parent d583132 commit 3b9f59f

4 files changed

Lines changed: 294 additions & 32 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java

Lines changed: 87 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545

4646
public class PipeTransferTabletBatchReq extends TPipeTransferReq {
4747

48+
private final transient List<PipeTransferTabletBinaryReq> binaryReqs = new ArrayList<>();
4849
private final transient List<PipeTransferTabletInsertNodeReq> insertNodeReqs = new ArrayList<>();
4950
private final transient List<PipeTransferTabletRawReq> tabletReqs = new ArrayList<>();
5051

@@ -60,6 +61,26 @@ public Pair<InsertRowsStatement, InsertMultiTabletsStatement> constructStatement
6061
final List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
6162
final List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
6263

64+
for (final PipeTransferTabletBinaryReq binaryReq : binaryReqs) {
65+
final InsertBaseStatement statement = binaryReq.constructStatement();
66+
if (statement.isEmpty()) {
67+
continue;
68+
}
69+
if (statement instanceof InsertRowStatement) {
70+
insertRowStatementList.add((InsertRowStatement) statement);
71+
} else if (statement instanceof InsertTabletStatement) {
72+
insertTabletStatementList.add((InsertTabletStatement) statement);
73+
} else if (statement instanceof InsertRowsStatement) {
74+
insertRowStatementList.addAll(
75+
((InsertRowsStatement) statement).getInsertRowStatementList());
76+
} else {
77+
throw new UnsupportedOperationException(
78+
String.format(
79+
"Unknown InsertBaseStatement %s constructed from PipeTransferTabletBinaryReq.",
80+
statement));
81+
}
82+
}
83+
6384
for (final PipeTransferTabletInsertNodeReq insertNodeReq : insertNodeReqs) {
6485
final InsertBaseStatement statement = insertNodeReq.constructStatement();
6586
if (statement.isEmpty()) {
@@ -132,19 +153,52 @@ public static PipeTransferTabletBatchReq fromTPipeTransferReq(
132153
final PipeTransferTabletBatchReq batchReq = new PipeTransferTabletBatchReq();
133154
final TabletStringInternPool tabletStringInternPool = new TabletStringInternPool();
134155

135-
// Binary size, for rolling upgrade
136-
ReadWriteIOUtils.readInt(transferReq.body);
137-
int size = ReadWriteIOUtils.readInt(transferReq.body);
156+
// Legacy 1.3.x batch bodies may carry WAL binary requests before insert nodes and tablets.
157+
int size = readNonNegativeSize(transferReq.body, "binary request count");
138158
for (int i = 0; i < size; ++i) {
139-
batchReq.insertNodeReqs.add(
140-
PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(
141-
(InsertNode) PlanFragment.deserializeHelper(transferReq.body, null)));
159+
final int length = readNonNegativeSize(transferReq.body, "binary request body length");
160+
if (length > transferReq.body.remaining()) {
161+
throw new IllegalArgumentException(
162+
String.format(
163+
"Invalid binary request body length %s, remaining body length %s.",
164+
length, transferReq.body.remaining()));
165+
}
166+
final byte[] body = new byte[length];
167+
transferReq.body.get(body);
168+
batchReq.binaryReqs.add(
169+
PipeTransferTabletBinaryReq.toTPipeTransferReq(ByteBuffer.wrap(body)));
142170
}
143171

144-
size = ReadWriteIOUtils.readInt(transferReq.body);
172+
size = readNonNegativeSize(transferReq.body, "insert node count");
145173
for (int i = 0; i < size; ++i) {
146-
batchReq.tabletReqs.add(
147-
PipeTransferTabletRawReq.toTPipeTransferRawReq(transferReq.body, tabletStringInternPool));
174+
final int startPosition = transferReq.body.position();
175+
try {
176+
batchReq.insertNodeReqs.add(
177+
PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(
178+
(InsertNode) PlanFragment.deserializeHelper(transferReq.body, null)));
179+
} catch (final RuntimeException e) {
180+
throw new IllegalArgumentException(
181+
String.format(
182+
"Failed to deserialize insert node %s/%s in tablet batch at body position %s with remaining body length %s.",
183+
i + 1, size, startPosition, transferReq.body.remaining()),
184+
e);
185+
}
186+
}
187+
188+
size = readNonNegativeSize(transferReq.body, "raw tablet count");
189+
for (int i = 0; i < size; ++i) {
190+
final int startPosition = transferReq.body.position();
191+
try {
192+
batchReq.tabletReqs.add(
193+
PipeTransferTabletRawReq.toTPipeTransferRawReq(
194+
transferReq.body, tabletStringInternPool));
195+
} catch (final RuntimeException e) {
196+
throw new IllegalArgumentException(
197+
String.format(
198+
"Failed to deserialize raw tablet %s/%s in tablet batch at body position %s with remaining body length %s.",
199+
i + 1, size, startPosition, transferReq.body.remaining()),
200+
e);
201+
}
148202
}
149203

150204
batchReq.version = transferReq.version;
@@ -153,8 +207,29 @@ public static PipeTransferTabletBatchReq fromTPipeTransferReq(
153207
return batchReq;
154208
}
155209

210+
private static int readNonNegativeSize(final ByteBuffer buffer, final String fieldName) {
211+
if (buffer.remaining() < Integer.BYTES) {
212+
throw new IllegalArgumentException(
213+
String.format(
214+
"Insufficient bytes to read %s in tablet batch, remaining body length %s.",
215+
fieldName, buffer.remaining()));
216+
}
217+
218+
final int size = ReadWriteIOUtils.readInt(buffer);
219+
if (size < 0) {
220+
throw new IllegalArgumentException(
221+
String.format("Invalid negative %s %s in tablet batch.", fieldName, size));
222+
}
223+
return size;
224+
}
225+
156226
/////////////////////////////// TestOnly ///////////////////////////////
157227

228+
@TestOnly
229+
public List<PipeTransferTabletBinaryReq> getBinaryReqs() {
230+
return binaryReqs;
231+
}
232+
158233
@TestOnly
159234
public List<PipeTransferTabletInsertNodeReq> getInsertNodeReqs() {
160235
return insertNodeReqs;
@@ -176,7 +251,8 @@ public boolean equals(final Object obj) {
176251
return false;
177252
}
178253
final PipeTransferTabletBatchReq that = (PipeTransferTabletBatchReq) obj;
179-
return insertNodeReqs.equals(that.insertNodeReqs)
254+
return binaryReqs.equals(that.binaryReqs)
255+
&& insertNodeReqs.equals(that.insertNodeReqs)
180256
&& tabletReqs.equals(that.tabletReqs)
181257
&& version == that.version
182258
&& type == that.type
@@ -185,6 +261,6 @@ public boolean equals(final Object obj) {
185261

186262
@Override
187263
public int hashCode() {
188-
return Objects.hash(insertNodeReqs, tabletReqs, version, type, body);
264+
return Objects.hash(binaryReqs, insertNodeReqs, tabletReqs, version, type, body);
189265
}
190266
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,17 @@ private void deserializeTPipeTransferRawReq(
188188
buffer.position(startPosition);
189189
}
190190

191-
tablet = PipeTabletUtils.internTablet(Tablet.deserialize(buffer), tabletStringInternPool);
192-
isAligned = ReadWriteIOUtils.readBool(buffer);
191+
try {
192+
tablet = PipeTabletUtils.internTablet(Tablet.deserialize(buffer), tabletStringInternPool);
193+
isAligned = ReadWriteIOUtils.readBool(buffer);
194+
} catch (final RuntimeException e) {
195+
buffer.position(startPosition);
196+
throw new IllegalArgumentException(
197+
String.format(
198+
"Failed to deserialize raw tablet request at body position %s with remaining body length %s.",
199+
startPosition, buffer.remaining()),
200+
e);
201+
}
193202
}
194203

195204
private static void ensureStatementDeserializedFromCurrentTabletFormat(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java

Lines changed: 71 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.tsfile.enums.TSDataType;
3333
import org.apache.tsfile.utils.Binary;
3434
import org.apache.tsfile.utils.BitMap;
35-
import org.apache.tsfile.utils.BytesUtils;
3635
import org.apache.tsfile.utils.Pair;
3736
import org.apache.tsfile.utils.ReadWriteIOUtils;
3837
import org.apache.tsfile.write.UnSupportedDataTypeException;
@@ -116,12 +115,19 @@ private static InsertTabletStatement deserializeStatementFromTabletFormat(
116115
intern(ReadWriteIOUtils.readString(byteBuffer), tabletStringInternPool);
117116

118117
final int rowSize = ReadWriteIOUtils.readInt(byteBuffer);
118+
if (rowSize < 0) {
119+
throw new IllegalArgumentException(
120+
String.format("Invalid row size %s in tablet format deserialization.", rowSize));
121+
}
119122

120123
// deserialize schemas
121124
final int schemaSize =
122-
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer))
123-
? ReadWriteIOUtils.readInt(byteBuffer)
124-
: 0;
125+
readBooleanByte(byteBuffer, "schema existence") ? ReadWriteIOUtils.readInt(byteBuffer) : 0;
126+
if (schemaSize < 0) {
127+
throw new IllegalArgumentException(
128+
String.format("Invalid schema size %s in tablet format deserialization.", schemaSize));
129+
}
130+
ensureRemaining(byteBuffer, schemaSize, "measurement schema existence flags");
125131
final String[] measurement = new String[schemaSize];
126132
final TsTableColumnCategory[] columnCategories = new TsTableColumnCategory[schemaSize];
127133
final TSDataType[] dataTypes = new TSDataType[schemaSize];
@@ -148,15 +154,26 @@ private static InsertTabletStatement deserializeStatementFromTabletFormat(
148154

149155
// Deserialize and calculate memory in the same loop
150156
for (int i = 0; i < schemaSize; i++) {
151-
final boolean hasSchema = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
157+
final boolean hasSchema = readBooleanByte(byteBuffer, "measurement schema existence");
152158
if (hasSchema) {
153159
final Pair<String, TSDataType> pair = readMeasurement(byteBuffer, tabletStringInternPool);
154160
measurement[i] = pair.getLeft();
155161
dataTypes[i] = pair.getRight();
156162
if (readColumnCategory) {
163+
if (!byteBuffer.hasRemaining()) {
164+
throw new IllegalArgumentException(
165+
"Missing column category in current tablet format deserialization.");
166+
}
167+
final byte columnCategory = byteBuffer.get();
168+
if (columnCategory < 0 || columnCategory >= ColumnCategory.values().length) {
169+
throw new IllegalArgumentException(
170+
String.format(
171+
"Invalid column category %s in current tablet format deserialization.",
172+
columnCategory));
173+
}
157174
columnCategories[i] =
158175
TsTableColumnCategory.fromTsFileColumnCategory(
159-
ColumnCategory.values()[byteBuffer.get()]);
176+
ColumnCategory.values()[columnCategory]);
160177
}
161178

162179
// Calculate memory for each measurement string
@@ -178,14 +195,22 @@ private static InsertTabletStatement deserializeStatementFromTabletFormat(
178195
memorySize += measurementMemorySize;
179196
memorySize += dataTypesMemorySize;
180197

198+
final boolean isTimesNotNull = readBooleanByte(byteBuffer, "timestamp column existence");
199+
if (rowSize > 0 && !isTimesNotNull) {
200+
throw new IllegalArgumentException(
201+
"Missing timestamps in tablet format deserialization with non-empty rows.");
202+
}
203+
if (isTimesNotNull) {
204+
ensureRemaining(byteBuffer, (long) Long.BYTES * rowSize, "timestamps");
205+
}
206+
181207
// deserialize times and calculate memory during deserialization
182208
final long[] times = new long[rowSize];
183209
// Calculate memory: array header + long size * rowSize
184210
final long timesMemorySize =
185211
org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
186212
NUM_BYTES_ARRAY_HEADER + (long) Long.BYTES * rowSize);
187213

188-
final boolean isTimesNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
189214
if (isTimesNotNull) {
190215
for (int i = 0; i < rowSize; i++) {
191216
times[i] = ReadWriteIOUtils.readLong(byteBuffer);
@@ -199,7 +224,7 @@ private static InsertTabletStatement deserializeStatementFromTabletFormat(
199224
final BitMap[] bitMaps;
200225
final long bitMapsMemorySize;
201226

202-
final boolean isBitMapsNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
227+
final boolean isBitMapsNotNull = readBooleanByte(byteBuffer, "bitmap column existence");
203228
if (isBitMapsNotNull) {
204229
// Use the method that returns both BitMap array and memory size
205230
final Pair<BitMap[], Long> bitMapsAndMemory =
@@ -218,7 +243,11 @@ private static InsertTabletStatement deserializeStatementFromTabletFormat(
218243
final Object[] values;
219244
final long valuesMemorySize;
220245

221-
final boolean isValuesNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
246+
final boolean isValuesNotNull = readBooleanByte(byteBuffer, "value column existence");
247+
if (rowSize > 0 && schemaSize > 0 && !isValuesNotNull) {
248+
throw new IllegalArgumentException(
249+
"Missing values in tablet format deserialization with non-empty rows.");
250+
}
222251
if (isValuesNotNull) {
223252
// Use the method that returns both values array and memory size
224253
final Pair<Object[], Long> valuesAndMemory =
@@ -236,7 +265,7 @@ private static InsertTabletStatement deserializeStatementFromTabletFormat(
236265
// Add values memory to total
237266
memorySize += valuesMemorySize;
238267

239-
final boolean isAligned = ReadWriteIOUtils.readBoolean(byteBuffer);
268+
final boolean isAligned = readBooleanByte(byteBuffer, "alignment");
240269

241270
statement.setMeasurements(measurement);
242271
statement.setTimes(times);
@@ -321,6 +350,30 @@ private static void skipString(final ByteBuffer buffer) {
321350
}
322351
}
323352

353+
private static boolean readBooleanByte(final ByteBuffer buffer, final String fieldName) {
354+
if (!buffer.hasRemaining()) {
355+
throw new IllegalArgumentException(
356+
String.format("Missing %s flag in tablet format deserialization.", fieldName));
357+
}
358+
359+
final byte value = ReadWriteIOUtils.readByte(buffer);
360+
if (value != 0 && value != 1) {
361+
throw new IllegalArgumentException(
362+
String.format("Invalid %s flag %s in tablet format deserialization.", fieldName, value));
363+
}
364+
return value == 1;
365+
}
366+
367+
private static void ensureRemaining(
368+
final ByteBuffer buffer, final long expectedSize, final String fieldName) {
369+
if (expectedSize > buffer.remaining()) {
370+
throw new IllegalArgumentException(
371+
String.format(
372+
"Insufficient bytes for %s in tablet format deserialization, expected %s, remaining %s.",
373+
fieldName, expectedSize, buffer.remaining()));
374+
}
375+
}
376+
324377
/**
325378
* Read measurement name and data type from buffer, skipping other measurement schema fields
326379
* (encoding, compression, and tags/attributes) that are not needed for InsertTabletStatement.
@@ -364,9 +417,13 @@ private static Pair<BitMap[], Long> readBitMapsFromBufferWithMemory(
364417
boolean hasMarkedBitMap = false;
365418

366419
for (int i = 0; i < columns; i++) {
367-
final boolean hasBitMap = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
420+
final boolean hasBitMap = readBooleanByte(byteBuffer, "bitmap existence");
368421
if (hasBitMap) {
369422
final int size = ReadWriteIOUtils.readInt(byteBuffer);
423+
if (size < 0) {
424+
throw new IllegalArgumentException(
425+
String.format("Invalid bitmap size %s in tablet format deserialization.", size));
426+
}
370427
final Binary valueBinary = ReadWriteIOUtils.readBinary(byteBuffer);
371428
final byte[] byteArray = valueBinary.getValues();
372429
final BitMap bitMap = new BitMap(size, byteArray);
@@ -416,8 +473,7 @@ private static Pair<Object[], Long> readValuesFromBufferWithMemory(
416473
NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns);
417474

418475
for (int i = 0; i < columns; i++) {
419-
final boolean isValueColumnsNotNull =
420-
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
476+
final boolean isValueColumnsNotNull = readBooleanByte(byteBuffer, "value column existence");
421477
if (types[i] == null) {
422478
continue;
423479
}
@@ -427,7 +483,7 @@ private static Pair<Object[], Long> readValuesFromBufferWithMemory(
427483
final boolean[] boolValues = new boolean[rowSize];
428484
if (isValueColumnsNotNull) {
429485
for (int index = 0; index < rowSize; index++) {
430-
boolValues[index] = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
486+
boolValues[index] = readBooleanByte(byteBuffer, "boolean value");
431487
}
432488
}
433489
values[i] = boolValues;
@@ -503,8 +559,7 @@ private static Pair<Object[], Long> readValuesFromBufferWithMemory(
503559

504560
if (isValueColumnsNotNull) {
505561
for (int index = 0; index < rowSize; index++) {
506-
final boolean isNotNull =
507-
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
562+
final boolean isNotNull = readBooleanByte(byteBuffer, "binary value existence");
508563
if (isNotNull) {
509564
binaryValues[index] = ReadWriteIOUtils.readBinary(byteBuffer);
510565
// Calculate memory for each Binary object during deserialization

0 commit comments

Comments
 (0)