Skip to content

Commit 9e316c3

Browse files
HIVE-29598: Fix vectorized outer join wrong results due to stale scratch column values (#6486)
Co-authored-by: konstantinb <konstantinb@users.noreply.github.com>
1 parent 985a11a commit 9e316c3

17 files changed

Lines changed: 774 additions & 10 deletions

File tree

ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -587,16 +587,12 @@ protected void generateOuterNulls(VectorizedRowBatch batch, int[] noMatchs,
587587
// key as null, too.
588588
//
589589
for (int column : outerSmallTableKeyColumnMap) {
590-
ColumnVector colVector = batch.cols[column];
591-
colVector.noNulls = false;
592-
colVector.isNull[batchIndex] = true;
590+
batch.cols[column].clearValue(batchIndex);
593591
}
594592

595593
// Small table values are set to null.
596594
for (int column : smallTableValueColumnMap) {
597-
ColumnVector colVector = batch.cols[column];
598-
colVector.noNulls = false;
599-
colVector.isNull[batchIndex] = true;
595+
batch.cols[column].clearValue(batchIndex);
600596
}
601597
}
602598
}
@@ -746,15 +742,13 @@ protected void generateOuterNullsRepeatedAll(VectorizedRowBatch batch) throws Hi
746742
//
747743
for (int column : outerSmallTableKeyColumnMap) {
748744
ColumnVector colVector = batch.cols[column];
749-
colVector.noNulls = false;
750-
colVector.isNull[0] = true;
745+
colVector.clearValue(0);
751746
colVector.isRepeating = true;
752747
}
753748

754749
for (int column : smallTableValueColumnMap) {
755750
ColumnVector colVector = batch.cols[column];
756-
colVector.noNulls = false;
757-
colVector.isNull[0] = true;
751+
colVector.clearValue(0);
758752
colVector.isRepeating = true;
759753
}
760754
}
Lines changed: 361 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,361 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hive.ql.exec.vector.mapjoin;
20+
21+
import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
22+
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
23+
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
24+
import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
25+
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
26+
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
27+
import org.apache.hadoop.hive.ql.exec.vector.IntervalDayTimeColumnVector;
28+
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
29+
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
30+
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
31+
import org.apache.hadoop.hive.ql.exec.vector.VoidColumnVector;
32+
import org.apache.hadoop.hive.ql.metadata.HiveException;
33+
import org.junit.jupiter.api.Test;
34+
import org.junit.jupiter.params.ParameterizedTest;
35+
import org.junit.jupiter.params.provider.Arguments;
36+
import org.junit.jupiter.params.provider.MethodSource;
37+
38+
import java.io.IOException;
39+
import java.nio.charset.StandardCharsets;
40+
import java.util.ArrayList;
41+
import java.util.Arrays;
42+
import java.util.List;
43+
import java.util.stream.Stream;
44+
45+
import static org.junit.jupiter.api.Assertions.assertEquals;
46+
import static org.junit.jupiter.api.Assertions.assertFalse;
47+
import static org.junit.jupiter.api.Assertions.assertNull;
48+
import static org.junit.jupiter.api.Assertions.assertTrue;
49+
50+
/**
51+
* HIVE-29598: verifies {@link VectorMapJoinOuterGenerateResultOperator} clears
52+
* every small-table slot for unmatched rows, so stale values cannot carry over
53+
* past the null marking.
54+
*/
55+
class TestVectorMapJoinOuterGenerateResultOperator {
56+
57+
/** Concrete subclass that exposes the generateOuterNulls* methods to tests. */
58+
private static final class TestableOuterOp extends VectorMapJoinOuterGenerateResultOperator {
59+
@Override
60+
protected String getLoggingPrefix() {
61+
throw new UnsupportedOperationException("stubbed only to instantiate abstract class under test");
62+
}
63+
64+
@Override
65+
public void processBatch(VectorizedRowBatch batch) {
66+
throw new UnsupportedOperationException("stubbed only to instantiate abstract class under test");
67+
}
68+
}
69+
70+
/**
71+
* Records {@code clearSlotValue} invocations to verify the operator dispatches
72+
* through {@code clearValue}, not just produces the slot-clearing side effect.
73+
*/
74+
private static class TrackingLongColumnVector extends LongColumnVector {
75+
final List<Integer> clearedIndices = new ArrayList<>();
76+
77+
TrackingLongColumnVector(int size) {
78+
super(size);
79+
}
80+
81+
@Override
82+
protected void clearSlotValue(int elementNum) {
83+
super.clearSlotValue(elementNum);
84+
clearedIndices.add(elementNum);
85+
}
86+
}
87+
88+
@Test
89+
void generateOuterNullsCallsClearValueOnEachMappedColumnForEachUnmatchedRow() throws HiveException, IOException {
90+
TestableOuterOp op = new TestableOuterOp();
91+
op.outerSmallTableKeyColumnMap = new int[] {0};
92+
op.smallTableValueColumnMap = new int[] {1, 2};
93+
94+
VectorizedRowBatch batch = new VectorizedRowBatch(3, 4);
95+
TrackingLongColumnVector keyCol = new TrackingLongColumnVector(4);
96+
TrackingLongColumnVector valCol1 = new TrackingLongColumnVector(4);
97+
TrackingLongColumnVector valCol2 = new TrackingLongColumnVector(4);
98+
keyCol.vector[1] = 99L;
99+
valCol1.vector[1] = 88L;
100+
valCol2.vector[3] = 77L;
101+
batch.cols[0] = keyCol;
102+
batch.cols[1] = valCol1;
103+
batch.cols[2] = valCol2;
104+
105+
int[] noMatchs = new int[] {1, 3};
106+
op.generateOuterNulls(batch, noMatchs, noMatchs.length);
107+
108+
assertEquals(Arrays.asList(1, 3), keyCol.clearedIndices);
109+
assertEquals(Arrays.asList(1, 3), valCol1.clearedIndices);
110+
assertEquals(Arrays.asList(1, 3), valCol2.clearedIndices);
111+
112+
assertFalse(keyCol.noNulls);
113+
assertTrue(keyCol.isNull[1]);
114+
assertTrue(keyCol.isNull[3]);
115+
assertFalse(keyCol.isNull[0]);
116+
assertFalse(keyCol.isNull[2]);
117+
118+
assertEquals(0L, keyCol.vector[1]);
119+
assertEquals(0L, valCol1.vector[1]);
120+
assertEquals(0L, valCol2.vector[3]);
121+
}
122+
123+
@Test
124+
void generateOuterNullsRepeatedAllCallsClearValueAtIndexZeroForEachMappedColumn() throws HiveException {
125+
TestableOuterOp op = new TestableOuterOp();
126+
op.outerSmallTableKeyColumnMap = new int[] {0};
127+
op.smallTableValueColumnMap = new int[] {1};
128+
129+
VectorizedRowBatch batch = new VectorizedRowBatch(2, 4);
130+
TrackingLongColumnVector keyCol = new TrackingLongColumnVector(4);
131+
TrackingLongColumnVector valCol = new TrackingLongColumnVector(4);
132+
keyCol.vector[0] = 42L;
133+
valCol.vector[0] = 84L;
134+
batch.cols[0] = keyCol;
135+
batch.cols[1] = valCol;
136+
137+
op.generateOuterNullsRepeatedAll(batch);
138+
139+
assertEquals(Arrays.asList(0), keyCol.clearedIndices);
140+
assertEquals(Arrays.asList(0), valCol.clearedIndices);
141+
142+
// isRepeating is set by the operator, not by clearValue.
143+
assertFalse(keyCol.noNulls);
144+
assertTrue(keyCol.isNull[0]);
145+
assertTrue(keyCol.isRepeating);
146+
assertFalse(valCol.noNulls);
147+
assertTrue(valCol.isNull[0]);
148+
assertTrue(valCol.isRepeating);
149+
150+
assertEquals(0L, keyCol.vector[0]);
151+
assertEquals(0L, valCol.vector[0]);
152+
}
153+
154+
@Test
155+
void generateOuterNullsSetsBookkeepingOnTypeWithNoClearSlotValueOverride() throws HiveException, IOException {
156+
// VoidColumnVector inherits the base no-op clearSlotValue — verifies the
157+
// operator still drives the null-marking through clearValue() on a type
158+
// without a per-slot value to zero.
159+
TestableOuterOp op = new TestableOuterOp();
160+
op.outerSmallTableKeyColumnMap = new int[] {};
161+
op.smallTableValueColumnMap = new int[] {0};
162+
163+
VectorizedRowBatch batch = new VectorizedRowBatch(1, 4);
164+
VoidColumnVector voidCol = new VoidColumnVector(4);
165+
batch.cols[0] = voidCol;
166+
167+
int[] noMatchs = new int[] {1, 3};
168+
op.generateOuterNulls(batch, noMatchs, noMatchs.length);
169+
170+
assertFalse(voidCol.noNulls);
171+
assertTrue(voidCol.isNull[1]);
172+
assertTrue(voidCol.isNull[3]);
173+
assertFalse(voidCol.isNull[0]);
174+
assertFalse(voidCol.isNull[2]);
175+
}
176+
177+
/**
178+
* For each {@link ColumnVector} subclass whose {@code clearSlotValue} is
179+
* overridden, verifies the operator's call through {@code clearValue} reaches
180+
* the override and clears the slot to the type's cleared state.
181+
*/
182+
@ParameterizedTest(name = "{0}")
183+
@MethodSource("modifiedColumnVectorTypes")
184+
void generateOuterNullsClearsSlotForEachModifiedType(
185+
String typeName,
186+
ColumnVector cv,
187+
Runnable preLoad,
188+
Runnable assertSlotCleared) throws HiveException, IOException {
189+
190+
TestableOuterOp op = new TestableOuterOp();
191+
op.outerSmallTableKeyColumnMap = new int[] {};
192+
op.smallTableValueColumnMap = new int[] {0};
193+
194+
VectorizedRowBatch batch = new VectorizedRowBatch(1, 4);
195+
preLoad.run();
196+
batch.cols[0] = cv;
197+
198+
int[] noMatchs = new int[] {2};
199+
op.generateOuterNulls(batch, noMatchs, noMatchs.length);
200+
201+
assertTrue(cv.isNull[2]);
202+
assertFalse(cv.noNulls);
203+
assertSlotCleared.run();
204+
}
205+
206+
@ParameterizedTest(name = "{0}")
207+
@MethodSource("modifiedColumnVectorTypesAtSlotZero")
208+
void generateOuterNullsRepeatedAllClearsSlotForEachModifiedType(
209+
String typeName,
210+
ColumnVector cv,
211+
Runnable preLoad,
212+
Runnable assertSlotCleared) throws HiveException {
213+
214+
TestableOuterOp op = new TestableOuterOp();
215+
op.outerSmallTableKeyColumnMap = new int[] {};
216+
op.smallTableValueColumnMap = new int[] {0};
217+
218+
VectorizedRowBatch batch = new VectorizedRowBatch(1, 4);
219+
preLoad.run();
220+
batch.cols[0] = cv;
221+
222+
op.generateOuterNullsRepeatedAll(batch);
223+
224+
assertTrue(cv.isNull[0]);
225+
assertFalse(cv.noNulls);
226+
assertTrue(cv.isRepeating);
227+
assertSlotCleared.run();
228+
}
229+
230+
static Stream<Arguments> modifiedColumnVectorTypesAtSlotZero() {
231+
final LongColumnVector longCv = new LongColumnVector(4);
232+
final DoubleColumnVector doubleCv = new DoubleColumnVector(4);
233+
final BytesColumnVector bytesCv = new BytesColumnVector(4);
234+
final DecimalColumnVector decCv = new DecimalColumnVector(4, 18, 4);
235+
final Decimal64ColumnVector dec64Cv = new Decimal64ColumnVector(4, 18, 4);
236+
final TimestampColumnVector tsCv = new TimestampColumnVector(4);
237+
final IntervalDayTimeColumnVector ivCv = new IntervalDayTimeColumnVector(4);
238+
239+
return Stream.of(
240+
Arguments.of(
241+
"LongColumnVector",
242+
longCv,
243+
(Runnable) () -> longCv.vector[0] = 999L,
244+
(Runnable) () -> assertEquals(0L, longCv.vector[0])),
245+
Arguments.of(
246+
"DoubleColumnVector",
247+
doubleCv,
248+
(Runnable) () -> doubleCv.vector[0] = 3.14,
249+
(Runnable) () -> assertEquals(0.0, doubleCv.vector[0])),
250+
Arguments.of(
251+
"BytesColumnVector",
252+
bytesCv,
253+
(Runnable) () -> {
254+
bytesCv.vector[0] = "stale".getBytes(StandardCharsets.UTF_8);
255+
bytesCv.start[0] = 1;
256+
bytesCv.length[0] = 3;
257+
},
258+
(Runnable) () -> {
259+
assertNull(bytesCv.vector[0]);
260+
assertEquals(0, bytesCv.start[0]);
261+
assertEquals(0, bytesCv.length[0]);
262+
}),
263+
Arguments.of(
264+
"DecimalColumnVector",
265+
decCv,
266+
(Runnable) () -> decCv.vector[0].setFromLong(999L),
267+
(Runnable) () -> assertEquals(0L, decCv.vector[0].serialize64(decCv.scale))),
268+
Arguments.of(
269+
"Decimal64ColumnVector",
270+
dec64Cv,
271+
(Runnable) () -> dec64Cv.vector[0] = 999L,
272+
(Runnable) () -> assertEquals(0L, dec64Cv.vector[0])),
273+
Arguments.of(
274+
"TimestampColumnVector",
275+
tsCv,
276+
(Runnable) () -> {
277+
tsCv.time[0] = 1234567890000L;
278+
tsCv.nanos[0] = 999;
279+
},
280+
(Runnable) () -> {
281+
assertEquals(0L, tsCv.time[0]);
282+
assertEquals(1, tsCv.nanos[0]);
283+
}),
284+
Arguments.of(
285+
"IntervalDayTimeColumnVector",
286+
ivCv,
287+
(Runnable) () -> ivCv.set(0, new HiveIntervalDayTime(5, 0)),
288+
(Runnable) () -> {
289+
assertEquals(0L, ivCv.getTotalSeconds(0));
290+
assertEquals(1, ivCv.getNanos(0));
291+
})
292+
);
293+
}
294+
295+
static Stream<Arguments> modifiedColumnVectorTypes() {
296+
final LongColumnVector longCv = new LongColumnVector(4);
297+
final DoubleColumnVector doubleCv = new DoubleColumnVector(4);
298+
final BytesColumnVector bytesCv = new BytesColumnVector(4);
299+
final DecimalColumnVector decCv = new DecimalColumnVector(4, 18, 4);
300+
final Decimal64ColumnVector dec64Cv = new Decimal64ColumnVector(4, 18, 4);
301+
final TimestampColumnVector tsCv = new TimestampColumnVector(4);
302+
final IntervalDayTimeColumnVector ivCv = new IntervalDayTimeColumnVector(4);
303+
304+
return Stream.of(
305+
Arguments.of(
306+
"LongColumnVector",
307+
longCv,
308+
(Runnable) () -> longCv.vector[2] = 999L,
309+
(Runnable) () -> assertEquals(0L, longCv.vector[2])),
310+
Arguments.of(
311+
"DoubleColumnVector",
312+
doubleCv,
313+
(Runnable) () -> doubleCv.vector[2] = 3.14,
314+
(Runnable) () -> assertEquals(0.0, doubleCv.vector[2])),
315+
Arguments.of(
316+
"BytesColumnVector",
317+
bytesCv,
318+
(Runnable) () -> {
319+
bytesCv.vector[2] = "stale".getBytes(StandardCharsets.UTF_8);
320+
bytesCv.start[2] = 1;
321+
bytesCv.length[2] = 3;
322+
},
323+
(Runnable) () -> {
324+
assertNull(bytesCv.vector[2]);
325+
assertEquals(0, bytesCv.start[2]);
326+
assertEquals(0, bytesCv.length[2]);
327+
}),
328+
Arguments.of(
329+
"DecimalColumnVector",
330+
decCv,
331+
(Runnable) () -> decCv.vector[2].setFromLong(999L),
332+
(Runnable) () -> assertEquals(0L, decCv.vector[2].serialize64(decCv.scale))),
333+
Arguments.of(
334+
"Decimal64ColumnVector",
335+
dec64Cv,
336+
(Runnable) () -> dec64Cv.vector[2] = 999L,
337+
(Runnable) () -> assertEquals(0L, dec64Cv.vector[2])),
338+
Arguments.of(
339+
"TimestampColumnVector",
340+
tsCv,
341+
(Runnable) () -> {
342+
tsCv.time[2] = 1234567890000L;
343+
tsCv.nanos[2] = 999;
344+
},
345+
(Runnable) () -> {
346+
// setNullValue convention: time = 0, nanos = 1
347+
assertEquals(0L, tsCv.time[2]);
348+
assertEquals(1, tsCv.nanos[2]);
349+
}),
350+
Arguments.of(
351+
"IntervalDayTimeColumnVector",
352+
ivCv,
353+
(Runnable) () -> ivCv.set(2, new HiveIntervalDayTime(5, 0)),
354+
(Runnable) () -> {
355+
// setNullValue convention: totalSeconds = 0, nanos = 1
356+
assertEquals(0L, ivCv.getTotalSeconds(2));
357+
assertEquals(1, ivCv.getNanos(2));
358+
})
359+
);
360+
}
361+
}

0 commit comments

Comments
 (0)