Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
*/
package org.apache.kyuubi.plugin.spark.authz.rule.rowfilter

import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.SparkStrategy

case class FilterDataSourceV2Strategy(spark: SparkSession) extends Strategy {
case class FilterDataSourceV2Strategy(spark: SparkSession) extends SparkStrategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ObjectFilterPlaceHolder(child) if child.nodeName == "ShowNamespaces" =>
spark.sessionState.planner.plan(child)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@ import org.scalatest.Assertions._

import org.apache.kyuubi.Utils
import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._
import org.apache.kyuubi.plugin.spark.authz.V2JdbcTableCatalogPrivilegesBuilderSuite._
import org.apache.kyuubi.plugin.spark.authz.ranger.DeltaCatalogRangerSparkExtensionSuite._

trait SparkSessionProvider {
protected val catalogImpl: String
protected def supportPurge: Boolean = true
protected def format: String = if (catalogImpl == "hive") "hive" else "parquet"

protected val extension: SparkSessionExtensions => Unit = _ => ()
Expand Down Expand Up @@ -85,26 +84,29 @@ trait SparkSessionProvider {

protected val sql: String => DataFrame = spark.sql

protected def doAs[T](user: String, f: => T): T = {
protected def doAs[T](user: String, f: => T, unused: String = ""): T = {

@pan3793 pan3793 Nov 24, 2025

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to keep both, I have to add a dummy parameter, otherwise, the compiler complains

[ERROR] [Error] /Users/chengpan/Projects/apache-kyuubi/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala:94: double definition:
protected def doAs[T](user: String, f: => T): T at line 87 and
protected def doAs[T](user: String)(f: => T): T at line 94
have same type after erasure: (user: String, f: Function0): Object

UserGroupInformation.createRemoteUser(user).doAs[T](
new PrivilegedExceptionAction[T] {
override def run(): T = f
})
}

protected def doAs[T](user: String)(f: => T): T = {
UserGroupInformation.createRemoteUser(user).doAs[T](
new PrivilegedExceptionAction[T] {
override def run(): T = f
})
}

protected def withCleanTmpResources[T](res: Seq[(String, String)])(f: => T): T = {
try {
f
} finally {
res.foreach {
case (t, "table") => doAs(
admin, {
val purgeOption =
if (isCatalogSupportPurge(
spark.sessionState.catalogManager.currentCatalog.name())) {
"PURGE"
} else ""
sql(s"DROP TABLE IF EXISTS $t $purgeOption")
})
case (t, "table") => doAs(admin) {
val purgeOption = if (supportPurge) "PURGE" else ""
sql(s"DROP TABLE IF EXISTS $t $purgeOption")
}
Comment on lines -99 to +109

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def doAs[T](user: String)(f: => T) has more pretty format here.

case (db, "database") => doAs(admin, sql(s"DROP DATABASE IF EXISTS $db"))
case (fn, "function") => doAs(admin, sql(s"DROP FUNCTION IF EXISTS $fn"))
case (view, "view") => doAs(admin, sql(s"DROP VIEW IF EXISTS $view"))
Expand All @@ -118,12 +120,4 @@ trait SparkSessionProvider {
protected def checkAnswer(user: String, query: String, result: Seq[Row]): Unit = {
doAs(user, assert(sql(query).collect() === result))
}

private def isCatalogSupportPurge(catalogName: String): Boolean = {
val unsupportedCatalogs = Set(v2JdbcTableCatalogClassName, deltaCatalogClassName)
spark.conf.getOption(s"spark.sql.catalog.$catalogName") match {
case Some(catalog) if !unsupportedCatalogs.contains(catalog) => true
case _ => false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.kyuubi.util.AssertionUtils._
@DeltaTest
class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
override protected val catalogImpl: String = "hive"
override protected val supportPurge: Boolean = false
override protected val sqlExtensions: String = "io.delta.sql.DeltaSparkSessionExtension"

val namespace1 = deltaNamespace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.kyuubi.util.AssertionUtils._
@PaimonTest
class PaimonCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
override protected val catalogImpl: String = "hive"
override protected val supportPurge: Boolean = false
private def isSupportedVersion = isScalaV212
override protected val sqlExtensions: String =
if (isSupportedVersion) "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
*/
class V2JdbcTableCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
override protected val catalogImpl: String = "in-memory"
override protected val supportPurge: Boolean = false

val catalogV2 = "testcat"
val jdbcCatalogV2 = "jdbc2"
Expand Down
Loading