Skip to content

Commit f2a6870

Browse files
authored
[VL] Remove buffering of sorted partitions in RSS writer to prevent OOM (#11059)
1 parent cd30767 commit f2a6870

3 files changed

Lines changed: 20 additions & 44 deletions

File tree

cpp/core/shuffle/rss/RssPartitionWriter.cc

Lines changed: 17 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,7 @@ void RssPartitionWriter::init() {
3030
}
3131

3232
arrow::Status RssPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
33-
if (rssOs_ != nullptr && !rssOs_->closed()) {
34-
if (compressedOs_ != nullptr) {
35-
RETURN_NOT_OK(compressedOs_->Close());
36-
compressTime_ = compressedOs_->compressTime();
37-
spillTime_ -= compressTime_;
38-
}
39-
ARROW_ASSIGN_OR_RAISE(const auto buffer, rssOs_->Finish());
40-
bytesEvicted_[lastEvictedPartitionId_] +=
41-
rssClient_->pushPartitionData(lastEvictedPartitionId_, buffer->data_as<char>(), buffer->size());
42-
}
43-
33+
spillTime_ -= compressTime_;
4434
rssClient_->stop();
4535

4636
auto totalBytesEvicted = std::accumulate(bytesEvicted_.begin(), bytesEvicted_.end(), 0LL);
@@ -70,36 +60,25 @@ arrow::Status RssPartitionWriter::hashEvict(
7060
arrow::Status
7161
RssPartitionWriter::sortEvict(uint32_t partitionId, std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal) {
7262
ScopedTimer timer(&spillTime_);
73-
if (lastEvictedPartitionId_ != partitionId) {
74-
if (lastEvictedPartitionId_ != -1) {
75-
GLUTEN_DCHECK(rssOs_ != nullptr && !rssOs_->closed(), "rssOs_ should not be null");
76-
if (compressedOs_ != nullptr) {
77-
RETURN_NOT_OK(compressedOs_->Flush());
78-
}
79-
80-
ARROW_ASSIGN_OR_RAISE(const auto buffer, rssOs_->Finish());
81-
bytesEvicted_[lastEvictedPartitionId_] +=
82-
rssClient_->pushPartitionData(lastEvictedPartitionId_, buffer->data_as<char>(), buffer->size());
83-
}
84-
85-
ARROW_ASSIGN_OR_RAISE(
86-
rssOs_, arrow::io::BufferOutputStream::Create(options_->pushBufferMaxSize, arrow::default_memory_pool()));
87-
if (codec_ != nullptr) {
88-
ARROW_ASSIGN_OR_RAISE(
89-
compressedOs_,
90-
ShuffleCompressedOutputStream::Make(
91-
codec_.get(), options_->compressionBufferSize, rssOs_, arrow::default_memory_pool()));
92-
}
93-
94-
lastEvictedPartitionId_ = partitionId;
95-
}
96-
9763
rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
98-
if (compressedOs_ != nullptr) {
99-
RETURN_NOT_OK(inMemoryPayload->serialize(compressedOs_.get()));
64+
ARROW_ASSIGN_OR_RAISE(
65+
auto rssOs, arrow::io::BufferOutputStream::Create(options_->pushBufferMaxSize, arrow::default_memory_pool()));
66+
if (codec_ != nullptr) {
67+
ARROW_ASSIGN_OR_RAISE(
68+
auto compressedOs,
69+
ShuffleCompressedOutputStream::Make(
70+
codec_.get(), options_->compressionBufferSize, rssOs, arrow::default_memory_pool()));
71+
RETURN_NOT_OK(inMemoryPayload->serialize(compressedOs.get()));
72+
RETURN_NOT_OK(compressedOs->Flush());
73+
RETURN_NOT_OK(compressedOs->Close());
74+
compressTime_ += compressedOs->compressTime();
10075
} else {
101-
RETURN_NOT_OK(inMemoryPayload->serialize(rssOs_.get()));
76+
RETURN_NOT_OK(inMemoryPayload->serialize(rssOs.get()));
10277
}
78+
ARROW_ASSIGN_OR_RAISE(const auto buffer, rssOs->Finish());
79+
bytesEvicted_[partitionId] +=
80+
rssClient_->pushPartitionData(partitionId, buffer->data_as<char>(), buffer->size());
81+
10382
return arrow::Status::OK();
10483
}
10584

cpp/core/shuffle/rss/RssPartitionWriter.h

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626

2727
namespace gluten {
2828

29-
class RssPartitionWriterOutputStream;
30-
3129
class RssPartitionWriter final : public PartitionWriter {
3230
public:
3331
RssPartitionWriter(
@@ -65,10 +63,6 @@ class RssPartitionWriter final : public PartitionWriter {
6563

6664
std::vector<int64_t> bytesEvicted_;
6765
std::vector<int64_t> rawPartitionLengths_;
68-
69-
int32_t lastEvictedPartitionId_{-1};
70-
std::shared_ptr<arrow::io::BufferOutputStream> rssOs_{nullptr};
71-
std::shared_ptr<ShuffleCompressedOutputStream> compressedOs_{nullptr};
7266
};
7367

7468
} // namespace gluten

gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ public CelebornShuffleManager(SparkConf conf) {
134134
rowBasedConf.set(SPARK_CELEBORN_COMPRESSION_CODEC_KEY, celebornDefaultCodec);
135135
rowBasedCelebornConf.set(CELEBORN_COMPRESSION_CODEC_KEY, celebornDefaultCodec);
136136
}
137+
138+
// Disable celeborn compression
139+
celebornConf.set(CELEBORN_COMPRESSION_CODEC_KEY, "none");
137140
}
138141

139142
private boolean isDriver() {

0 commit comments

Comments
 (0)