Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ nav_order: 15
| spark.gluten.sql.columnar.shuffle.sort.partitions.threshold | 4000 | The threshold to determine whether to use sort-based columnar shuffle. Sort-based shuffle will be used if the number of partitions is greater than this threshold. |
| spark.gluten.sql.columnar.shuffledHashJoin | true | Enable or disable columnar shuffledHashJoin. |
| spark.gluten.sql.columnar.shuffledHashJoin.optimizeBuildSide | true | Whether to allow Gluten to choose an optimal build side for shuffled hash join. |
| spark.gluten.sql.columnar.smallFileThreshold | 0.5 | The total size threshold of small files in table scan.To avoid small files being placed into the same partition, Gluten will try to distribute small files into different partitions when the total size of small files is below this threshold. |
| spark.gluten.sql.columnar.sort | true | Enable or disable columnar sort. |
| spark.gluten.sql.columnar.sortMergeJoin | true | Enable or disable columnar sortMergeJoin. This should be set with preferSortMergeJoin=false. |
| spark.gluten.sql.columnar.tableCache | false | Enable or disable columnar table cache. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ class GlutenConfig(conf: SQLConf) extends GlutenCoreConfig(conf) {

def extendedExpressionTransformer: String = getConf(EXTENDED_EXPRESSION_TRAN_CONF)

def smallFileThreshold: Double = getConf(SMALL_FILE_THRESHOLD)

def expressionBlacklist: Set[String] = {
val blacklistSet = getConf(EXPRESSION_BLACK_LIST)
.map(_.toLowerCase(Locale.ROOT).split(",").map(_.trim()).filter(_.nonEmpty).toSet)
Expand Down Expand Up @@ -1577,4 +1579,14 @@ object GlutenConfig extends ConfigRegistry {
.doc("Enable or disable columnar collectTail.")
.booleanConf
.createWithDefault(true)

val SMALL_FILE_THRESHOLD =
buildConf("spark.gluten.sql.columnar.smallFileThreshold")
.doc(
"The total size threshold of small files in table scan." +
"To avoid small files being placed into the same partition, " +
"Gluten will try to distribute small files into different partitions when the " +
"total size of small files is below this threshold.")
.doubleConf
.createWithDefault(0.5)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@
*/
package org.apache.gluten.utils

import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.utils.PartitionsUtil.regeneratePartition

import org.apache.spark.Partition
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.datasources.{BucketingUtils, FilePartition, HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.execution.datasources.{BucketingUtils, FilePartition, HadoopFsRelation, PartitionDirectory, PartitionedFile}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.BitSet

import org.apache.hadoop.fs.Path

import scala.collection.mutable

case class PartitionsUtil(
relation: HadoopFsRelation,
requiredSchema: StructType,
Expand Down Expand Up @@ -96,7 +100,10 @@ case class PartitionsUtil(
}
.sortBy(_.length)(implicitly[Ordering[Long]].reverse)

FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)
val inputPartitions =
FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)

regeneratePartition(inputPartitions, GlutenConfig.get.smallFileThreshold)
}

private def genBucketedPartitionSeq(): Seq[Partition] = {
Expand Down Expand Up @@ -140,3 +147,90 @@ case class PartitionsUtil(
output.find(_.name == colName)
}
}

object PartitionsUtil {

/**
* Regenerate the partitions by balancing the number of files per partition and total size per
* partition.
*/
def regeneratePartition(
inputPartitions: Seq[FilePartition],
smallFileThreshold: Double): Seq[FilePartition] = {

// Flatten and sort descending by file size.
val filesSorted: Seq[(PartitionedFile, Long)] =
inputPartitions
.flatMap(_.files)
.map(f => (f, f.length))
.sortBy(_._2)(Ordering.Long.reverse)

val partitions = Array.fill(inputPartitions.size)(mutable.ArrayBuffer.empty[PartitionedFile])

def addToBucket(
heap: mutable.PriorityQueue[(Long, Int, Int)],
file: PartitionedFile,
sz: Long): Unit = {
val (load, numFiles, idx) = heap.dequeue()
partitions(idx) += file
heap.enqueue((load + sz, numFiles + 1, idx))
}

// First by load, then by numFiles.
val heapByFileSize =
mutable.PriorityQueue.empty[(Long, Int, Int)](
Ordering
.by[(Long, Int, Int), (Long, Int)] {
case (load, numFiles, _) =>
(load, numFiles)
}
.reverse
)

if (smallFileThreshold > 0) {
val smallFileTotalSize = filesSorted.map(_._2).sum * smallFileThreshold
// First by numFiles, then by load.
val heapByFileNum =
mutable.PriorityQueue.empty[(Long, Int, Int)](
Ordering
.by[(Long, Int, Int), (Int, Long)] {
case (load, numFiles, _) =>
(numFiles, load)
}
.reverse
)

inputPartitions.indices.foreach(i => heapByFileNum.enqueue((0L, 0, i)))

var numSmallFiles = 0
var smallFileSize = 0L
// Enqueue small files to the least number of files and the least load.
filesSorted.reverse.takeWhile(f => f._2 + smallFileSize <= smallFileTotalSize).foreach {
case (file, sz) =>
addToBucket(heapByFileNum, file, sz)
numSmallFiles += 1
smallFileSize += sz
}

// Move buckets from heapByFileNum to heapByFileSize.
while (heapByFileNum.nonEmpty) {
heapByFileSize.enqueue(heapByFileNum.dequeue())
}

// Finally, enqueue remaining files.
filesSorted.take(filesSorted.size - numSmallFiles).foreach {
case (file, sz) =>
addToBucket(heapByFileSize, file, sz)
}
} else {
inputPartitions.indices.foreach(i => heapByFileSize.enqueue((0L, 0, i)))

filesSorted.foreach {
case (file, sz) =>
addToBucket(heapByFileSize, file, sz)
}
}

partitions.zipWithIndex.map { case (p, idx) => FilePartition(idx, p.toArray) }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.utils

import org.apache.gluten.execution.PartitionedFileUtilShim

import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}

import org.scalatest.funsuite.AnyFunSuite

class PartitionsUtilSuite extends AnyFunSuite {

private def makePartitionedFile(path: String, length: Long): PartitionedFile =
PartitionedFileUtilShim.makePartitionedFileFromPath(path, length)

private def makeFilePartitions(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that sometimes the length of the output from makeFilePartitions does not match the specified numPartitions value in the input. For instance, consider the following code:

val files = (1 to 50).map(i => makePartitionedFile(s"f$i", i * 10))
val initialPartitions = makeFilePartitions(files, 20)

The length of initialPartitions is 17 instead of 20.

files: Seq[PartitionedFile],
numPartitions: Int): Seq[FilePartition] = {
val numGroups = files.size / numPartitions +
(if (files.size % numPartitions == 0) 0 else 1)
files.grouped(numGroups).toSeq.zipWithIndex.map {
case (p, idx) => FilePartition(idx, p.toArray)
}
}

test("large files are distributed evenly by size") {
val files = Seq(
makePartitionedFile("f1", 100),
makePartitionedFile("f2", 90),
makePartitionedFile("f3", 80),
makePartitionedFile("f4", 70)
)
val initialPartitions = makeFilePartitions(files, 2)

val result = PartitionsUtil.regeneratePartition(initialPartitions, 0.0)

assert(result.size === 2)

val sizes = result.map(_.files.map(_.length).sum)
assert(sizes.forall(_ === 170))
}

test("small files are distributed evenly by number of files") {
val files = (1 to 10).map(i => makePartitionedFile(s"f$i", 10))
val initialPartitions = makeFilePartitions(files, 5)

val result = PartitionsUtil.regeneratePartition(initialPartitions, 1.0)

assert(result.size === 5)
val counts = result.map(_.files.length)
assert(counts.forall(_ === 2))
}

test("small files should not be placed into one partition") {
val files = Seq(
makePartitionedFile("f1", 10),
makePartitionedFile("f2", 20),
makePartitionedFile("f3", 30),
makePartitionedFile("f4", 40),
makePartitionedFile("f5", 100)
)
val initialPartitions = makeFilePartitions(files, 2)

// Only "f5" is not small file.
val result = PartitionsUtil.regeneratePartition(initialPartitions, 0.5)

assert(result.size === 2)
assert(result.forall(_.files.exists(_.length <= 40)))
}

test("zero length files") {
val files = Seq(
makePartitionedFile("f1", 0),
makePartitionedFile("f2", 0)
)
val initialPartitions = makeFilePartitions(files, 2)

val result = PartitionsUtil.regeneratePartition(initialPartitions, 0.0)

assert(result.size === 2)
assert(result.count(_.files.nonEmpty) === 2)
}

test("empty inputs") {
val result = PartitionsUtil.regeneratePartition(Seq.empty, 0.5)
assert(result.size === 0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ class GlutenSQLQueryTestSuite
.set("spark.memory.offHeap.size", "1024MB")
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set(GlutenConfig.SMALL_FILE_THRESHOLD.key, "0")

if (isCHBackend) {
conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ class GlutenSQLQueryTestSuite
.set("spark.memory.offHeap.size", "1024MB")
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set(GlutenConfig.SMALL_FILE_THRESHOLD.key, "0")

if (isCHBackend) {
conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ class GlutenSQLQueryTestSuite
.set("spark.memory.offHeap.size", "1024MB")
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set(GlutenConfig.SMALL_FILE_THRESHOLD.key, "0")

if (isCHBackend) {
conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class GlutenSQLQueryTestSuite
.set("spark.memory.offHeap.size", "1024MB")
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set(GlutenConfig.SMALL_FILE_THRESHOLD.key, "0")

if (isCHBackend) {
conf
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.spark.sql.execution.datasources.PartitionedFile

object PartitionedFileUtilShim {
// Helper method to create PartitionedFile from path and length.
def makePartitionedFileFromPath(path: String, length: Long): PartitionedFile = {
PartitionedFile(null, path, 0, length, Array.empty)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.spark.sql.execution.datasources.PartitionedFile

object PartitionedFileUtilShim {
// Helper method to create PartitionedFile from path and length.
def makePartitionedFileFromPath(path: String, length: Long): PartitionedFile = {
PartitionedFile(null, path, 0, length, Array.empty)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.execution.datasources.PartitionedFile

object PartitionedFileUtilShim {
// Helper method to create PartitionedFile from path and length.
def makePartitionedFileFromPath(path: String, length: Long): PartitionedFile = {
PartitionedFile(null, SparkPath.fromPathString(path), 0, length, Array.empty)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.gluten.execution

import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.PartitionedFileUtil
Expand Down Expand Up @@ -148,4 +149,8 @@ object PartitionedFileUtilShim {
}
}

// Helper method to create PartitionedFile from path and length.
def makePartitionedFileFromPath(path: String, length: Long): PartitionedFile = {
PartitionedFile(null, SparkPath.fromPathString(path), 0, length, Array.empty)
}
}
Loading