Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions bolt/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,26 @@ void PageReader::seekToPage(int64_t row, bool keepRepDefRawData) {
numRowsInPage_ = 0;
break;
}
// For nested (non top level) columns, 'preloadRepDefs' only fully
// decodes the first 'decodeRepDefPageCount_' pages and keeps the
// remaining pages as raw rep/def bytes inside 'preloadedRepDefs_'.
// When a leaf-level filter pushdown triggers a skip()/seekToPage()
// that walks past the sampled boundary, the consumer side
// ('setPageRowInfo' inside 'prepareDataPageV1') indexes
// 'numLeavesInPage_[pageIndex_]' and would go out of bounds.
//
// Materialise just enough preloaded rep/def bytes BEFORE entering
// the next page so that 'setPageRowInfo' remains a pure lookup.
// The loop only fires when we are about to cross the sampled
// boundary and there is something left to decode, mirroring (and
// complementing) the "ahead by one page" invariant established at
// the exit of 'decodeRepDefs'.
if (hasChunkRepDefs_ && !isTopLevel_ && maxRepeat_ > 0) {
while (pageIndex_ + 1 >= static_cast<int32_t>(numLeavesInPage_.size()) &&
!preloadedRepDefs_.empty()) {
loadMoreRepDefs();
}
}
PageHeader pageHeader = readPageHeader();
pageStart_ = pageDataStart_ + pageHeader.compressed_page_size;

Expand Down
Binary file not shown.
58 changes: 58 additions & 0 deletions bolt/dwio/parquet/tests/reader/ParquetReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2904,3 +2904,61 @@ INSTANTIATE_TEST_SUITE_P(
[](const testing::TestParamInfo<FloatToDoubleTestParam>& info) {
return info.param.toString();
});

TEST_F(ParquetReaderTest, lazyRepDefSanitizedCustomTagRepro) {
const std::string kFilePath =
getExampleFilePath("bolt_lazy_repdef_sanitized_custom_tag.parquet");
constexpr int32_t kBatchRows = 4096;

if (!std::filesystem::exists(kFilePath)) {
GTEST_SKIP() << "Sanitized repro parquet file is not available: "
<< kFilePath;
}

auto rowType =
ROW({"filter_col", "map_col"}, {INTEGER(), MAP(VARCHAR(), VARCHAR())});

auto scanSpec = makeScanSpec(rowType);
scanSpec->getOrCreateChild("filter_col")->setFilter(exec::between(3, 4));

auto* mapSpec = scanSpec->getOrCreateChild("map_col");
mapSpec->setExtractValues(true);
for (auto& child : mapSpec->children()) {
child->setExtractValues(true);
}

bytedance::bolt::dwio::common::ReaderOptions readerOpts{leafPool_.get()};
auto reader = createReader(kFilePath, readerOpts);
auto rowReaderOpts = getReaderOpts(rowType);
rowReaderOpts.setScanSpec(scanSpec);
rowReaderOpts.setDecodeRepDefPageCount(10);
rowReaderOpts.setParquetRepDefMemoryLimit(16UL << 20);

auto rowReader = reader->createRowReader(rowReaderOpts);
VectorPtr result = BaseVector::create(rowType, 0, leafPool_.get());
uint64_t totalRows = 0;
uint64_t totalCustomTagEntries = 0;
for (;;) {
const auto got = rowReader->next(kBatchRows, result);
if (got == 0) {
break;
}

auto* row = result->as<RowVector>();
ASSERT_NE(row, nullptr);
ASSERT_EQ(row->childrenSize(), 2);
auto customTag = row->childAt(1)->loadedVector();
ASSERT_NE(customTag, nullptr);
auto* map = customTag->as<MapVector>();
ASSERT_NE(map, nullptr);
for (vector_size_t i = 0; i < map->size(); ++i) {
if (!map->isNullAt(i)) {
totalCustomTagEntries += map->sizeAt(i);
}
}
totalRows += result->size();
}

EXPECT_GT(totalRows, 0);
EXPECT_GT(totalCustomTagEntries, 0);
}
Loading