Skip to content

Commit 694b1d0

Browse files
authored
Merge branch 'main' into fix-delta-cm-partition-filter
2 parents abedd16 + 4634c80 commit 694b1d0

102 files changed

Lines changed: 3017 additions & 1426 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.asf.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ github:
4848
discussions: true
4949
wiki: false
5050
projects: true
51+
copilot_code_review:
52+
enabled: true
53+
review_drafts: false
54+
review_on_push: true
55+
5156
notifications:
5257
commits: commits@gluten.apache.org
5358
issues: commits@gluten.apache.org

.github/workflows/util/install-spark-resources.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then
118118
4.0)
119119
# Spark-4.0, scala 2.12 // using 2.12 as a hack as 4.0 does not have 2.13 suffix
120120
cd ${INSTALL_DIR} && \
121-
install_spark "4.0.1" "3" "2.12"
121+
install_spark "4.0.2" "3" "2.12"
122122
mv /opt/shims/spark40/spark_home/assembly/target/scala-2.12 /opt/shims/spark40/spark_home/assembly/target/scala-2.13
123123
;;
124124
4.1)

.github/workflows/velox_backend_arm.yml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,20 +137,21 @@ jobs:
137137
138138
cpp-test-udf-test:
139139
runs-on: ubuntu-24.04-arm
140-
container: apache/gluten:centos-8-jdk8
140+
container: apache/gluten:centos-9-jdk8
141141
steps:
142142
- uses: actions/checkout@v4
143143
- name: Get Ccache
144144
uses: actions/cache/restore@v4
145145
with:
146146
path: '${{ env.CCACHE_DIR }}'
147-
key: ccache-centos8-release-shared-${{runner.arch}}-${{github.sha}}
147+
key: ccache-centos9-release-shared-${{runner.arch}}-${{github.sha}}
148148
restore-keys: |
149-
ccache-centos8-release-shared-${{runner.arch}}
149+
ccache-centos9-release-shared-${{runner.arch}}
150150
- name: Build Gluten native libraries
151151
run: |
152152
df -a
153-
bash dev/ci-velox-buildshared-centos-8.sh
153+
sed -i "s|gflags_static|gflags_shared|g" /usr/local/lib/cmake/folly/folly-targets.cmake # TODO: remove after upgrading folly to 2024.09.30 or later which has fixed the gflags linkage issue
154+
bash dev/ci-velox-buildshared-centos-9.sh
154155
ccache -s
155156
- name: Run CPP unit test
156157
run: |

.github/workflows/velox_backend_cache.yml

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -83,34 +83,34 @@ jobs:
8383
path: '${{ env.CCACHE_DIR }}'
8484
key: ccache-centos8-release-default-${{runner.arch}}-${{github.sha}}
8585

86-
cache-shared-lib-centos-8:
86+
cache-shared-lib-centos-9:
8787
if: ${{ startsWith(github.repository, 'apache/') }}
8888
runs-on: ${{ matrix.os }}
8989
strategy:
9090
matrix:
9191
os: [ ubuntu-22.04, ubuntu-24.04-arm ]
92-
container: apache/gluten:centos-8-jdk8
92+
container: apache/gluten:centos-9-jdk8
9393
steps:
9494
- uses: actions/checkout@v4
9595
- name: Get Ccache
9696
uses: actions/cache/restore@v3
9797
with:
9898
path: '${{ env.CCACHE_DIR }}'
99-
key: ccache-centos8-release-shared-${{runner.arch}}-${{github.sha}}
99+
key: ccache-centos9-release-shared-${{runner.arch}}-${{github.sha}}
100100
restore-keys: |
101-
ccache-centos8-release-shared-${{runner.arch}}
101+
ccache-centos9-release-shared-${{runner.arch}}
102102
- name: Build Gluten shared libraries
103103
run: |
104104
df -a
105105
export CCACHE_MAXSIZE=1G
106-
bash dev/ci-velox-buildshared-centos-8.sh
106+
bash dev/ci-velox-buildshared-centos-9.sh
107107
ccache -s
108108
- name: Save Ccache
109109
uses: actions/cache/save@v3
110110
id: ccache
111111
with:
112112
path: '${{ env.CCACHE_DIR }}'
113-
key: ccache-centos8-release-shared-${{runner.arch}}-${{github.sha}}
113+
key: ccache-centos9-release-shared-${{runner.arch}}-${{github.sha}}
114114

115115
cache-enhanced-native-lib-centos-7:
116116
if: ${{ startsWith(github.repository, 'apache/') }}
@@ -139,7 +139,7 @@ jobs:
139139
path: '${{ env.CCACHE_DIR }}'
140140
key: ccache-enhanced-centos7-release-default-${{github.sha}}
141141

142-
cache-shared-lib-centos-9:
142+
cache-shared-lib-centos-9-cudf:
143143
if: ${{ startsWith(github.repository, 'apache/') }}
144144
runs-on: ${{ matrix.os }}
145145
strategy:
@@ -156,7 +156,7 @@ jobs:
156156
uses: actions/cache/restore@v3
157157
with:
158158
path: '${{ env.CCACHE_DIR }}'
159-
key: ccache-centos9-release-shared-${{runner.arch}}-${{github.sha}}
159+
key: ccache-centos9-cudf-release-shared-${{runner.arch}}-${{github.sha}}
160160
restore-keys: |
161161
ccache-centos9-release-shared-${{runner.arch}}
162162
- name: Build Gluten shared libraries
@@ -187,4 +187,4 @@ jobs:
187187
id: ccache
188188
with:
189189
path: '${{ env.CCACHE_DIR }}'
190-
key: ccache-centos9-release-shared-${{runner.arch}}-${{github.sha}}
190+
key: ccache-centos9-cudf-release-shared-${{runner.arch}}-${{github.sha}}

.github/workflows/velox_backend_x86.yml

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1168,20 +1168,21 @@ jobs:
11681168
11691169
cpp-test-udf-test:
11701170
runs-on: ubuntu-22.04
1171-
container: apache/gluten:centos-8-jdk8
1171+
container: apache/gluten:centos-9-jdk8
11721172
steps:
11731173
- uses: actions/checkout@v4
11741174
- name: Get Ccache
11751175
uses: actions/cache/restore@v4
11761176
with:
11771177
path: '${{ env.CCACHE_DIR }}'
1178-
key: ccache-centos8-release-shared-${{runner.arch}}-${{github.sha}}
1178+
key: ccache-centos9-release-shared-${{runner.arch}}-${{github.sha}}
11791179
restore-keys: |
1180-
ccache-centos8-release-shared-${{runner.arch}}
1180+
ccache-centos9-release-shared-${{runner.arch}}
11811181
- name: Build Gluten native libraries
11821182
run: |
11831183
df -a
1184-
bash dev/ci-velox-buildshared-centos-8.sh
1184+
sed -i "s|gflags_static|gflags_shared|g" /usr/local/lib/cmake/folly/folly-targets.cmake # TODO remove after image update
1185+
bash dev/ci-velox-buildshared-centos-9.sh
11851186
ccache -s
11861187
- name: Run CPP unit test
11871188
run: |
@@ -1245,9 +1246,9 @@ jobs:
12451246
uses: actions/cache/restore@v4
12461247
with:
12471248
path: '${{ env.CCACHE_DIR }}'
1248-
key: ccache-centos9-release-shared-${{runner.arch}}-${{github.sha}}
1249+
key: ccache-centos9-cudf-release-shared-${{runner.arch}}-${{github.sha}}
12491250
restore-keys: |
1250-
ccache-centos9-release-shared-${{runner.arch}}
1251+
ccache-centos9-cudf-release-shared-${{runner.arch}}
12511252
- name: Build Gluten native libraries
12521253
run: |
12531254
docker run -v $GITHUB_WORKSPACE:/work -w /work apache/gluten:centos-9-jdk8-cudf bash -c "
@@ -1307,7 +1308,7 @@ jobs:
13071308
pip3 install setuptools==77.0.3 && \
13081309
pip3 install pyspark==3.5.5 cython && \
13091310
pip3 install pandas==2.2.3 pyarrow==20.0.0
1310-
- name: Prepare Spark Resources for Spark 4.0.1 #TODO remove after image update
1311+
- name: Prepare Spark Resources for Spark 4.0.2 #TODO remove after image update
13111312
run: |
13121313
rm -rf /opt/shims/spark40
13131314
bash .github/workflows/util/install-spark-resources.sh 4.0
@@ -1358,7 +1359,7 @@ jobs:
13581359
with:
13591360
name: arrow-jars-centos-7-${{github.sha}}
13601361
path: /root/.m2/repository/org/apache/arrow/
1361-
- name: Prepare Spark Resources for Spark 4.0.1 #TODO remove after image update
1362+
- name: Prepare Spark Resources for Spark 4.0.2 #TODO remove after image update
13621363
run: |
13631364
rm -rf /opt/shims/spark40
13641365
bash .github/workflows/util/install-spark-resources.sh 4.0

backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHExpressionUtil.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ object CHExpressionUtil {
200200
REGR_SLOPE -> DefaultValidator(),
201201
REGR_INTERCEPT -> DefaultValidator(),
202202
REGR_SXY -> DefaultValidator(),
203+
BITMAP_CONSTRUCT_AGG -> DefaultValidator(),
203204
TO_UTC_TIMESTAMP -> UtcTimestampValidator(),
204205
FROM_UTC_TIMESTAMP -> UtcTimestampValidator(),
205206
STACK -> DefaultValidator(),

backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,8 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
151151
nativeBufferSize,
152152
GlutenConfig.get.columnarShuffleReallocThreshold,
153153
GlutenConfig.get.columnarShufflePartitionBufferEvictThreshold,
154-
partitionWriterHandle
154+
partitionWriterHandle,
155+
false
155156
)
156157
case SortShuffleWriterType =>
157158
shuffleWriterJniWrapper.createSortShuffleWriter(
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.gluten.delta
18+
19+
import org.apache.gluten.sql.shims.SparkShimLoader
20+
21+
import org.apache.spark.sql.SparkSession
22+
import org.apache.spark.sql.delta.GlutenDeltaParquetFileFormat
23+
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
24+
import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArrayFormat, StoredBitmap}
25+
import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore
26+
import org.apache.spark.sql.execution.datasources.PartitionedFile
27+
28+
import org.apache.hadoop.fs.Path
29+
30+
import java.util.{ArrayList => JArrayList}
31+
32+
import scala.collection.JavaConverters._
33+
import scala.util.control.NonFatal
34+
35+
object DeltaDeletionVectorScanInfo {
36+
object RowIndexFilterType extends Enumeration {
37+
type RowIndexFilterType = Value
38+
val KEEP_ALL, IF_CONTAINED, IF_NOT_CONTAINED = Value
39+
}
40+
41+
import RowIndexFilterType._
42+
43+
final case class DeletionVectorInfo(
44+
hasDeletionVector: Boolean,
45+
rowIndexFilterType: RowIndexFilterType,
46+
cardinality: Long,
47+
serializedDeletionVector: Array[Byte])
48+
49+
final case class PartitionFileScanInfo(
50+
normalizedOtherMetadataColumns: Map[String, Object],
51+
deletionVectorInfo: DeletionVectorInfo)
52+
53+
private val RowIndexFilterIdEncoded =
54+
GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED
55+
private val RowIndexFilterTypeKey =
56+
GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE
57+
58+
def extract(
59+
spark: SparkSession,
60+
partitionColumnCount: Int,
61+
file: PartitionedFile): PartitionFileScanInfo = {
62+
val metadata = otherMetadataColumns(file)
63+
val normalizedMetadata = metadata -- Seq(RowIndexFilterIdEncoded, RowIndexFilterTypeKey)
64+
val dvInfo = extractDeletionVectorInfo(spark, partitionColumnCount, file, metadata)
65+
PartitionFileScanInfo(normalizedMetadata, dvInfo)
66+
}
67+
68+
def extractAll(
69+
spark: SparkSession,
70+
partitionColumnCount: Int,
71+
files: Seq[PartitionedFile]): Seq[PartitionFileScanInfo] = {
72+
files.map(extract(spark, partitionColumnCount, _))
73+
}
74+
75+
def extractAllFromJava(
76+
spark: SparkSession,
77+
partitionColumnCount: Int,
78+
files: java.util.List[PartitionedFile]): java.util.List[PartitionFileScanInfo] = {
79+
new JArrayList(extractAll(spark, partitionColumnCount, files.asScala.toSeq).asJava)
80+
}
81+
82+
private def extractDeletionVectorInfo(
83+
spark: SparkSession,
84+
partitionColumnCount: Int,
85+
file: PartitionedFile,
86+
metadata: Map[String, Object]): DeletionVectorInfo = {
87+
val descriptorValue = metadata.get(RowIndexFilterIdEncoded)
88+
val filterTypeValue = metadata.get(RowIndexFilterTypeKey)
89+
90+
(descriptorValue, filterTypeValue) match {
91+
case (None, None) =>
92+
DeletionVectorInfo(false, KEEP_ALL, 0L, Array.emptyByteArray)
93+
case (Some(encodedDescriptor), Some(filterType)) =>
94+
val descriptor = parseDescriptor(encodedDescriptor.toString)
95+
val serializedPayload = serializePayload(spark, partitionColumnCount, file, descriptor)
96+
DeletionVectorInfo(
97+
true,
98+
parseRowIndexFilterType(filterType.toString),
99+
descriptor.cardinality,
100+
serializedPayload)
101+
case _ =>
102+
throw new IllegalStateException(
103+
s"Both $RowIndexFilterIdEncoded and $RowIndexFilterTypeKey must either be present or absent")
104+
}
105+
}
106+
107+
private def otherMetadataColumns(file: PartitionedFile): Map[String, Object] = {
108+
val otherMetadata =
109+
SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(file)
110+
if (otherMetadata == null) {
111+
Map.empty
112+
} else {
113+
otherMetadata.asScala.toMap
114+
}
115+
}
116+
117+
private def parseDescriptor(encodedDescriptor: String): DeletionVectorDescriptor = {
118+
try {
119+
DeletionVectorDescriptor.deserializeFromBase64(encodedDescriptor)
120+
} catch {
121+
case NonFatal(e) =>
122+
throw new IllegalArgumentException("Unable to parse Delta deletion vector descriptor", e)
123+
}
124+
}
125+
126+
private def parseRowIndexFilterType(filterType: String): RowIndexFilterType = {
127+
filterType match {
128+
case "IF_CONTAINED" => IF_CONTAINED
129+
case "IF_NOT_CONTAINED" => IF_NOT_CONTAINED
130+
case "KEEP_ALL" => KEEP_ALL
131+
case unexpected =>
132+
throw new IllegalStateException(s"Unexpected row index filter type: $unexpected")
133+
}
134+
}
135+
136+
private def serializePayload(
137+
spark: SparkSession,
138+
partitionColumnCount: Int,
139+
file: PartitionedFile,
140+
descriptor: DeletionVectorDescriptor): Array[Byte] = {
141+
val tablePath = resolveTablePath(spark, partitionColumnCount, file)
142+
if (tablePath == null) {
143+
throw new IllegalStateException(
144+
"Unable to resolve Delta table path while materializing deletion vector payload")
145+
}
146+
val dvStore = new HadoopFileSystemDVStore(spark.sessionState.newHadoopConf())
147+
StoredBitmap
148+
.create(descriptor, tablePath)
149+
.load(dvStore)
150+
.serializeAsByteArray(RoaringBitmapArrayFormat.Portable)
151+
}
152+
153+
private def resolveTablePath(
154+
spark: SparkSession,
155+
partitionColumnCount: Int,
156+
file: PartitionedFile): Path = {
157+
val fileParent = new Path(unescapePathName(file.filePath.toString)).getParent
158+
var tablePath = fileParent
159+
for (_ <- 0 until partitionColumnCount) {
160+
tablePath = tablePath.getParent
161+
}
162+
if (tablePath != null && isDeltaTablePath(spark, tablePath)) {
163+
return tablePath
164+
}
165+
166+
var candidate = fileParent
167+
while (candidate != null && !isDeltaTablePath(spark, candidate)) {
168+
candidate = candidate.getParent
169+
}
170+
if (candidate != null) candidate else tablePath
171+
}
172+
173+
private def isDeltaTablePath(spark: SparkSession, tablePath: Path): Boolean = {
174+
val deltaLogPath = new Path(tablePath, "_delta_log")
175+
try {
176+
deltaLogPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(deltaLogPath)
177+
} catch {
178+
case NonFatal(_) => false
179+
}
180+
}
181+
182+
private def unescapePathName(path: String): String = {
183+
if (path == null || path.indexOf('%') < 0) {
184+
path
185+
} else {
186+
val builder = new StringBuilder(path.length)
187+
var index = 0
188+
while (index < path.length) {
189+
if (path.charAt(index) == '%' && index + 2 < path.length) {
190+
val high = Character.digit(path.charAt(index + 1), 16)
191+
val low = Character.digit(path.charAt(index + 2), 16)
192+
if (high >= 0 && low >= 0) {
193+
builder.append(((high << 4) | low).toChar)
194+
index += 3
195+
} else {
196+
builder.append(path.charAt(index))
197+
index += 1
198+
}
199+
} else {
200+
builder.append(path.charAt(index))
201+
index += 1
202+
}
203+
}
204+
builder.toString()
205+
}
206+
}
207+
}

0 commit comments

Comments
 (0)