Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,20 @@ abstract class JdbcDialect extends SupportServiceLoader with Logging {

def getSchemaHelper(): SchemaHelper

/**
* Switch the current database on the backend connection.
*
* Called during session open when the client specified a database in the connection URL
* (populated by the Hive JDBC driver into `USE_DATABASE`). Dialects that support an
* in-session database context switch (e.g. MySQL's `USE db`) should override this and
* return `true` on success; the default is a no-op that returns `false`, meaning the
* backend has no notion of "current database" that applies to this session.
*
* The return value is used by `JdbcOperationManager` to decide whether the requested
* database can be treated as the effective schema filter for metadata operations.
*/
def setCurrentDatabase(connection: Connection, database: String): Boolean = false
Comment thread
wangzhigang1999 marked this conversation as resolved.
Outdated

def cancelStatement(jdbcStatement: Statement): Unit = {
if (jdbcStatement != null) {
jdbcStatement.cancel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,34 @@ class MySQLDialect extends JdbcDialect {
statement
}

override def getCatalogsOperation(): String = {
"SELECT CATALOG_NAME FROM INFORMATION_SCHEMA.SCHEMATA GROUP BY CATALOG_NAME"
}

override def getSchemasOperation(catalog: String, schema: String): String = {
// Alias the MySQL-native columns to the JDBC-standard names expected by
// DatabaseMetaData.getSchemas(): TABLE_CATALOG and TABLE_SCHEM.
val query = new StringBuilder(
"""SELECT CATALOG_NAME AS TABLE_CATALOG, SCHEMA_NAME AS TABLE_SCHEM
|FROM INFORMATION_SCHEMA.SCHEMATA
|""".stripMargin)

val filters = ArrayBuffer[String]()
if (StringUtils.isNotBlank(catalog)) {
filters += s"CATALOG_NAME LIKE '$catalog'"
}
if (StringUtils.isNotBlank(schema)) {
filters += s"SCHEMA_NAME LIKE '$schema'"
}

if (filters.nonEmpty) {
query.append(" WHERE ")
query.append(filters.mkString(" AND "))
}

query.toString()
}

override def getTablesQuery(
catalog: String,
schema: String,
Expand Down Expand Up @@ -122,6 +150,16 @@ class MySQLDialect extends JdbcDialect {
query.toString()
}

override def setCurrentDatabase(connection: Connection, database: String): Boolean = {
val stmt = connection.createStatement()
try {
stmt.execute(s"USE `$database`")
true
} finally {
stmt.close()
}
}

override def getTRowSetGenerator(): JdbcTRowSetGenerator = new MySQLTRowSetGenerator

override def getSchemaHelper(): SchemaHelper = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package org.apache.kyuubi.engine.jdbc.operation

import java.util

import org.apache.commons.lang3.StringUtils

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_FETCH_SIZE, ENGINE_JDBC_OPERATION_INCREMENTAL_COLLECT}
Expand Down Expand Up @@ -91,7 +93,12 @@ class JdbcOperationManager(conf: KyuubiConf) extends OperationManager("JdbcOpera
schemaName: String,
tableName: String,
tableTypes: util.List[String]): Operation = {
val query = dialect.getTablesQuery(catalogName, schemaName, tableName, tableTypes)
val effectiveSchema = if (StringUtils.isBlank(schemaName) || schemaName == "%") {
session.asInstanceOf[JdbcSessionImpl].effectiveDatabase.orNull
} else {
schemaName
}
val query = dialect.getTablesQuery(catalogName, effectiveSchema, tableName, tableTypes)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't recall the details of the initial code of this part, but the API looks problematic.

the Hive Thrift GetTablesOperation API is derived from the JDBC API, and that's why they look almost identical, for JDBC engine, such a call should directly map to the JDBC API instead of requesting a SQL (if you take a look at the JDBC driver implementation, you may find some impls are non-SQL-based), the current design complicates the whole thing.

@wangzhigang1999 wangzhigang1999 Apr 28, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for raising this — you're right, and I want to confirm I took it seriously rather than hand-wave it away. To make sure I understood, I ran the full set of DatabaseMetaData calls against three different drivers (mysql-connector-j 8.4.0, postgresql 42.7.2, clickhouse-jdbc 0.6.5) using a small Java probe and docker containers. Test fixtures were tailored to each backend (MySQL/PG had PK + FK + a function; ClickHouse used MergeTree which has no native PK/FK).

Cross-driver behavior of each API:

API MySQL (databaseTerm=CATALOG) MySQL (databaseTerm=SCHEMA) PostgreSQL ClickHouse
getCatalogs() ✓ implemented ⚠ empty (driver databaseTerm selection) ✓ implemented ✓ implemented
getSchemas() ⚠ empty (driver databaseTerm selection) ✓ implemented ✓ implemented — model has no schema layer
getTableTypes()
getTables(...)
getColumns(...)
getPrimaryKeys — MergeTree has no PK
getImported / Exported / CrossReference — no FK in CK
getFunctions(...) (no fixture created)
getTypeInfo()

Things that hold across all three drivers:

  • Every JDBC-standard metadata method is implemented — none threw SQLFeatureNotSupportedException; concepts that don't apply just return empty result sets. (The JDBC contract requires this; clients differ in how strictly they enforce it.)
  • Column names follow the JDBC spec (TABLE_CAT, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, DATA_TYPE, KEY_SEQ, PK_NAME, ...) — which is exactly what GetTablesOperation / GetColumnsOperation / etc. on the Hive Thrift side expect, since the Thrift API was derived from the JDBC API in the first place.
  • Driver-specific quirks are minor and well-defined — the databaseTerm selection is unique to mysql-connector-j; PG returns column labels in lowercase while MySQL/CK use uppercase (JDBC spec says case-insensitive, but engine-side string comparisons would need to match).

So the JDBC standard API really does cover every operation the dialect models today, across very different DBMS families. Concretely, this means the JdbcDialect.getXxxQuery family of methods could be deleted in favor of the engine calling connection.getMetaData.getXxx(...) directly — and dialects whose driver implements metadata via a non-SQL path (Avatica, etc.) would automatically be used the way they were designed to.

The few wrinkles I hit are all driver-side and resolvable in a few lines of engine-level adapter code, not by reaching for SQL:

  • getCatalogs / getSchemas are selected by mysql-connector-j's databaseTerm property (only one returns rows in a given mode) — handled by checking the property and routing, or by calling both and taking whichever is non-empty.
  • DATA_TYPE is int in the JDBC spec but the current SQL returns a string — java.sql.Types-to-name mapping is one helper.
  • The current SQL returns extra MySQL-specific columns (ENGINE, TABLE_ROWS, ...) but the Hive Thrift GetTables result schema is fixed to JDBC-standard columns, so those extras are already dropped during serialization today and aren't reaching clients — no real loss.

I'd like to keep this PR scoped to the URL-database bug since that's what it set out to fix, and migrating the dialect API to call DatabaseMetaData directly will touch every dialect, the operation manager, and every metadata test — not something I'd want to fold into a bug fix. Happy to open a follow-up issue tracking the migration if you agree with that split. WDYT?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

migrating the dialect API to call DatabaseMetaData directly will touch every dialect, the operation manager, and every metadata test

if we do that first, do we still need this patch?

@pan3793 pan3793 Apr 28, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No SQLFeatureNotSupportedException anywhere

Actually, the JDBC API Javadocs define behavior in a very clear way, it should not throw SQLFeatureNotSupportedException unless Javadocs say it can. Some tools, like DBeaver, may tolerate it if the JDBC driver vendor does not respect the API contract, but tools like DataGrip may not.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, we MUST avoid copying the code from mysql-connector-j, it's GPL-licensed ... try to read the docs or use black box testing to tackle the mysql-connector-j-related issue. (this will be a horrible experience ...)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

migrating the dialect API to call DatabaseMetaData directly will touch every dialect, the operation manager, and every metadata test

if we do that first, do we still need this patch?

Yes — the bug is the Hive JDBC client converting null schemaPattern to "%", and JDBC spec says "%" means "don't narrow by schema", so calling DatabaseMetaData.getTables(...) directly reproduces it. Verified on mysql-connector-j 8.4.0 (connection opened against jdbc:mysql://.../dbA):

conn.getCatalog() == "dbA"
getTables(null, "%",  "%", null) → tables from dbA AND dbB   ← bug reproduces
getTables("dbA", null, "%", null) → tables from dbA only     ← only narrows when explicit

So the engine still has to substitute the URL-scoped database into the call when the client sends null/"%". That routing logic (effectiveDatabase + the if-blank-then-route block in JdbcOperationManager) survives the refactor unchanged — only the line that builds a SQL string flips to conn.getMetaData.getTables(...). The setSchema-on-session-open machinery is also independent.

What the refactor does subsume: the two getCatalogsOperation / getSchemasOperation SQL implementations I added to MySQLDialect — those go away once the engine calls getCatalogs() / getSchemas() directly.

Happy either way on ordering — bug fix first then refactor, or refactor first and a smaller patch on top.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, we MUST avoid copying the code from mysql-connector-j, it's GPL-licensed ... try to read the docs or use black box testing to tackle the mysql-connector-j-related issue. (this will be a horrible experience ...)

Agreed, and thanks for catching this early. I've rewritten all my prior PR comments and the in-tree comment in MySQLDialect.setSchema to remove any reference / paraphrase of mysql-connector-j source — keeping only the documented databaseTerm property and black-box test observations. Also amended the FOLLOWUP commit message similarly. Will follow this rule going forward.

val normalizedConf = session.asInstanceOf[JdbcSessionImpl].normalizedConf
val fetchSize = normalizedConf.get(ENGINE_JDBC_FETCH_SIZE.key).map(_.toInt)
.getOrElse(session.sessionManager.getConf.get(ENGINE_JDBC_FETCH_SIZE))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ package org.apache.kyuubi.engine.jdbc.session
import java.sql.{Connection, DatabaseMetaData}

import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.jdbc.connection.ConnectionProvider
import org.apache.kyuubi.engine.jdbc.dialect.JdbcDialects
import org.apache.kyuubi.engine.jdbc.util.KyuubiJdbcUtils
import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager, USE_DATABASE}
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion}

class JdbcSessionImpl(
Expand All @@ -45,6 +47,13 @@ class JdbcSessionImpl(

private var databaseMetaData: DatabaseMetaData = _

/**
* The database that was successfully applied to the backend connection during session open.
* `None` when no `USE_DATABASE` was requested, or when the request was for "default"
* (the Hive JDBC driver's fallback) and the backend had no such database.
*/
private[jdbc] var effectiveDatabase: Option[String] = None

val sessionConf: KyuubiConf = normalizeConf

private def normalizeConf: KyuubiConf = {
Expand All @@ -67,6 +76,20 @@ class JdbcSessionImpl(
sessionConf,
sessionConnection,
sessionConf.get(ENGINE_JDBC_SESSION_INITIALIZE_SQL))
conf.get(USE_DATABASE).foreach { database =>
try {
if (JdbcDialects.get(sessionConf).setCurrentDatabase(sessionConnection, database)) {
effectiveDatabase = Some(database)
info(s"Switched to database: $database")
}
} catch {
case NonFatal(e) if database == "default" =>
// The Hive JDBC driver sends "default" when the user didn't specify a database
// in the connection URL. Tolerate USE failure in that case so the session can
// still open against backends that have no "default" database.
warn(s"Failed to switch to database 'default', ignored.", e)
}
}
super.open()
info(s"The jdbc session is started.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.kyuubi.engine.jdbc.mysql

import java.sql.ResultSet
import java.sql.{DriverManager, ResultSet}

import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -250,4 +250,102 @@ abstract class MySQLOperationSuite extends WithMySQLEngine with HiveJDBCTestHelp
statement.execute("drop database db2")
}
}

test("mysql - getTables scopes to database from connection URL (KYUUBI #7305)") {
withJdbcStatement() { statement =>
statement.execute("create database if not exists db7305a")
statement.execute("create table db7305a.ta(id bigint)" +
"ENGINE=InnoDB DEFAULT CHARSET=utf8;")
statement.execute("create database if not exists db7305b")
statement.execute("create table db7305b.tb(id bigint)" +
"ENGINE=InnoDB DEFAULT CHARSET=utf8;")
}

val hostAndPort = jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head
val scopedUrl = s"jdbc:hive2://$hostAndPort/db7305a;"
val conn = DriverManager.getConnection(scopedUrl, user, password)
try {
val tables = conn.getMetaData.getTables(null, null, "t%", null)
val found = ArrayBuffer[(String, String)]()
while (tables.next()) {
found += ((tables.getString(TABLE_SCHEMA), tables.getString(TABLE_NAME)))
}
assert(found.contains(("db7305a", "ta")))
assert(!found.exists { case (schema, _) => schema == "db7305b" })

// Hive driver converts schemaPattern=null to "%", and the fix routes "%" back to
// the effective database, so the behavior should match the call above.
val tablesPct = conn.getMetaData.getTables(null, "%", "t%", null)
val foundPct = ArrayBuffer[(String, String)]()
while (tablesPct.next()) {
foundPct += ((tablesPct.getString(TABLE_SCHEMA), tablesPct.getString(TABLE_NAME)))
}
assert(foundPct.contains(("db7305a", "ta")))
assert(!foundPct.exists { case (schema, _) => schema == "db7305b" })
} finally {
conn.close()
}

withJdbcStatement() { statement =>
statement.execute("drop table db7305a.ta")
statement.execute("drop database db7305a")
statement.execute("drop table db7305b.tb")
statement.execute("drop database db7305b")
}
}

test("mysql - getTables returns all tables when no database in URL") {
withJdbcStatement() { statement =>
statement.execute("create database if not exists db7305c")
statement.execute("create table db7305c.tc(id bigint)" +
"ENGINE=InnoDB DEFAULT CHARSET=utf8;")
statement.execute("create database if not exists db7305d")
statement.execute("create table db7305d.td(id bigint)" +
"ENGINE=InnoDB DEFAULT CHARSET=utf8;")
}

withJdbcStatement() { statement =>
val tables = statement.getConnection.getMetaData.getTables(null, null, "t%", null)
val found = ArrayBuffer[(String, String)]()
while (tables.next()) {
found += ((tables.getString(TABLE_SCHEMA), tables.getString(TABLE_NAME)))
}
// Without USE_DATABASE, the fix must not filter, so both dbs are visible.
assert(found.contains(("db7305c", "tc")))
assert(found.contains(("db7305d", "td")))

statement.execute("drop table db7305c.tc")
statement.execute("drop database db7305c")
statement.execute("drop table db7305d.td")
statement.execute("drop database db7305d")
}
}

test("mysql - getSchemas returns all schemas (KYUUBI #7305)") {
withJdbcStatement() { statement =>
statement.execute("create database if not exists db7305e")
statement.execute("create database if not exists db7305f")
}

withJdbcStatement() { statement =>
val rs = statement.getConnection.getMetaData.getSchemas
val schemas = ArrayBuffer[String]()
while (rs.next()) schemas += rs.getString("TABLE_SCHEM")
assert(schemas.contains("db7305e"))
assert(schemas.contains("db7305f"))

statement.execute("drop database db7305e")
statement.execute("drop database db7305f")
}
}

test("mysql - session open fails when URL specifies a non-existent database") {
val hostAndPort = jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head
val badUrl = s"jdbc:hive2://$hostAndPort/does_not_exist_db_7305;"
val ex = intercept[java.sql.SQLException] {
DriverManager.getConnection(badUrl, user, password).close()
}
// Error surface could come from the engine or JDBC driver; we only assert it is raised.
assert(ex != null)
}
}
Loading