Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,109 +16,35 @@
*/
package org.apache.kyuubi.engine.jdbc.dialect

import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.commons.lang3.StringUtils
import java.sql.Connection

import org.apache.kyuubi.engine.jdbc.clickhouse.{ClickHouseSchemaHelper, ClickHouseTRowSetGenerator}
import org.apache.kyuubi.engine.jdbc.schema.{JdbcTRowSetGenerator, SchemaHelper}
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.{COLUMN_NAME, TABLE_CATALOG, TABLE_NAME, TABLE_SCHEMA, TABLE_TYPE}
import org.apache.kyuubi.session.Session

class ClickHouseDialect extends JdbcDialect {
override def name(): String = "clickhouse"

override def getTRowSetGenerator(): JdbcTRowSetGenerator = new ClickHouseTRowSetGenerator

override def getSchemaHelper(): SchemaHelper = new ClickHouseSchemaHelper

override def getTablesQuery(
catalog: String,
schema: String,
tableName: String,
tableTypes: util.List[String]): String = {
val tTypes =
if (tableTypes == null || tableTypes.isEmpty) {
Set("BASE TABLE", "SYSTEM VIEW", "VIEW")
} else {
tableTypes.asScala.toSet
}
val query = new StringBuilder(
s"""
|SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE,
|TABLE_ROWS, DATA_LENGTH,
|TABLE_COLLATION, TABLE_COMMENT
|FROM INFORMATION_SCHEMA.TABLES
|""".stripMargin)

val filters = ArrayBuffer[String]()
if (StringUtils.isNotBlank(catalog)) {
filters += s"$TABLE_CATALOG = '$catalog'"
}
// clickhouse-jdbc's setCatalog/setSchema are mutually exclusive via the `databaseTerm`
// driver property; write both so the update always lands regardless of user config.
override def setSchema(conn: Connection, schema: String): Unit = setDatabase(conn, schema)

if (StringUtils.isNotBlank(schema)) {
filters += s"$TABLE_SCHEMA LIKE '$schema'"
}
override def setCatalog(conn: Connection, catalog: String): Unit = setDatabase(conn, catalog)

if (StringUtils.isNotBlank(tableName)) {
filters += s"$TABLE_NAME LIKE '$tableName'"
}
override def getCurrentSchema(conn: Connection): String = readDatabase(conn)

if (tTypes.nonEmpty) {
filters += s"(${
tTypes.map { tableType => s"$TABLE_TYPE = '$tableType'" }
.mkString(" OR ")
})"
}
override def getCatalog(conn: Connection): String = readDatabase(conn)

if (filters.nonEmpty) {
query.append(" WHERE ")
query.append(filters.mkString(" AND "))
}

query.toString()
private def setDatabase(conn: Connection, db: String): Unit = {
conn.setCatalog(db)
conn.setSchema(db)
}

override def getColumnsQuery(
session: Session,
catalogName: String,
schemaName: String,
tableName: String,
columnName: String): String = {
val query = new StringBuilder(
"""
|SELECT
|`TABLE_CATALOG`,`TABLE_SCHEMA`,`TABLE_NAME`,`COLUMN_NAME`,`ORDINAL_POSITION`,
|`COLUMN_DEFAULT`,`IS_NULLABLE`,`DATA_TYPE`,`CHARACTER_MAXIMUM_LENGTH`,
|`CHARACTER_OCTET_LENGTH`,`NUMERIC_PRECISION`,`NUMERIC_PRECISION_RADIX`,
|`NUMERIC_SCALE`,`DATETIME_PRECISION`,`CHARACTER_SET_CATALOG`,`CHARACTER_SET_SCHEMA`,
|`CHARACTER_SET_NAME`,`COLLATION_CATALOG`,`COLLATION_SCHEMA`,`COLLATION_NAME`,
|`DOMAIN_CATALOG`,`DOMAIN_SCHEMA`,`DOMAIN_NAME`, `EXTRA`, `COLUMN_COMMENT`, `COLUMN_TYPE`
|FROM information_schema.columns
|""".stripMargin)

val filters = ArrayBuffer[String]()
if (StringUtils.isNotEmpty(catalogName)) {
filters += s"$TABLE_CATALOG = '$catalogName'"
}
if (StringUtils.isNotEmpty(schemaName)) {
filters += s"$TABLE_SCHEMA LIKE '$schemaName'"
}
if (StringUtils.isNotEmpty(tableName)) {
filters += s"$TABLE_NAME LIKE '$tableName'"
}
if (StringUtils.isNotEmpty(columnName)) {
filters += s"$COLUMN_NAME LIKE '$columnName'"
}
private def readDatabase(conn: Connection): String = {
val c = conn.getCatalog
if (c != null && c.nonEmpty) c else conn.getSchema
}

if (filters.nonEmpty) {
query.append(" WHERE ")
query.append(filters.mkString(" AND "))
}
override def getTRowSetGenerator(): JdbcTRowSetGenerator = new ClickHouseTRowSetGenerator

query.toString()
}
override def getSchemaHelper(): SchemaHelper = new ClickHouseSchemaHelper
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,80 +16,40 @@
*/
package org.apache.kyuubi.engine.jdbc.dialect

import java.util
import java.sql.Connection

import org.apache.commons.lang3.StringUtils

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.jdbc.impala.{ImpalaSchemaHelper, ImpalaTRowSetGenerator}
import org.apache.kyuubi.engine.jdbc.schema.{JdbcTRowSetGenerator, SchemaHelper}
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.util.JdbcUtils.withCloseable

class ImpalaDialect extends JdbcDialect {

override def getTablesQuery(
catalog: String,
schema: String,
tableName: String,
tableTypes: util.List[String]): String = {
if (isPattern(schema)) {
throw KyuubiSQLException.featureNotSupported("Pattern-like schema names not supported")
}

val query = new StringBuilder("show tables ")

if (StringUtils.isNotEmpty(schema) && !isWildcardSetByKyuubi(schema)) {
query.append(s"in $schema ")
}

if (StringUtils.isNotEmpty(tableName)) {
query.append(s"like '${toImpalaRegex(tableName)}'")
// The JDBC engine talks to Impalad via `KyuubiHiveDriver` (see `ImpalaConnectionProvider`,
// chosen for its fixed `getMoreResults()` behavior). `KyuubiConnection#setSchema` ships a
// Kyuubi-private session config (`kyuubi.operation.set.current.database`) that Impalad
// does not recognize and rejects with "Invalid query option". Issue `USE` directly so the
// database switch lands as plain Impala SQL the backend understands.
override def setSchema(conn: Connection, schema: String): Unit = {
val escaped = schema.replace("`", "``")
withCloseable(conn.createStatement()) { stmt =>
stmt.execute(s"USE `$escaped`")
}

query.toString()
}

override def getColumnsQuery(
session: Session,
catalogName: String,
schemaName: String,
tableName: String,
columnName: String): String = {
if (StringUtils.isEmpty(tableName)) {
throw KyuubiSQLException("Table name should not be empty")
}

if (isPattern(schemaName)) {
throw KyuubiSQLException.featureNotSupported("Pattern-like schema names not supported")
}

if (isPattern(tableName)) {
throw KyuubiSQLException.featureNotSupported("Pattern-like table names not supported")
// Symmetric to `setSchema`: `KyuubiConnection#getSchema` ships a Kyuubi-private session
// config (`kyuubi.operation.get.current.database`) that Impalad rejects. Read the current
// database via plain SQL.
override def getCurrentSchema(conn: Connection): String = {
withCloseable(conn.createStatement()) { stmt =>
withCloseable(stmt.executeQuery("SELECT current_database()")) { rs =>
if (rs.next()) rs.getString(1) else null
}
}

val query = new StringBuilder("show column stats ")

if (StringUtils.isNotEmpty(schemaName) && !isWildcardSetByKyuubi(schemaName)) {
query.append(s"$schemaName.")
}

query.append(tableName)
query.toString()
}

override def getTRowSetGenerator(): JdbcTRowSetGenerator = new ImpalaTRowSetGenerator

override def getSchemaHelper(): SchemaHelper = new ImpalaSchemaHelper

override def name(): String = "impala"

private def isPattern(value: String): Boolean = {
value != null && !isWildcardSetByKyuubi(value) && value.contains("*")
}

private def isWildcardSetByKyuubi(pattern: String): Boolean = pattern == "%"

private def toImpalaRegex(pattern: String): String = {
pattern.replace("%", "*")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@
package org.apache.kyuubi.engine.jdbc.dialect

import java.sql.{Connection, ResultSet, Statement}
import java.util

import org.apache.kyuubi.{KyuubiException, KyuubiSQLException, Logging}
import org.apache.kyuubi.{KyuubiException, Logging}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_CONNECTION_URL, ENGINE_JDBC_SHORT_NAME}
import org.apache.kyuubi.engine.jdbc.schema.{JdbcTRowSetGenerator, SchemaHelper}
import org.apache.kyuubi.engine.jdbc.util.SupportServiceLoader
import org.apache.kyuubi.operation.Operation
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.util.reflect.ReflectUtils._

abstract class JdbcDialect extends SupportServiceLoader with Logging {
Expand All @@ -37,46 +34,75 @@ abstract class JdbcDialect extends SupportServiceLoader with Logging {
statement
}

def getTypeInfoOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}
// --- Connection-level current catalog/schema hooks ---

def getCatalogsOperation(): String = {
throw KyuubiSQLException.featureNotSupported()
}
def getCatalog(conn: Connection): String = conn.getCatalog

def getSchemasOperation(catalog: String, schema: String): String = {
throw KyuubiSQLException.featureNotSupported()
}
def setCatalog(conn: Connection, catalog: String): Unit = conn.setCatalog(catalog)

def getTablesQuery(
catalog: String,
schema: String,
tableName: String,
tableTypes: util.List[String]): String
def getCurrentSchema(conn: Connection): String = conn.getSchema

def getTableTypesOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}
def setSchema(conn: Connection, schema: String): Unit = conn.setSchema(schema)

def getColumnsQuery(
session: Session,
catalogName: String,
schemaName: String,
tableName: String,
columnName: String): String
// --- Metadata hooks; default impl forwards to standard JDBC DatabaseMetaData ---

def getFunctionsOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}
def getTypeInfo(conn: Connection): ResultSet =
conn.getMetaData.getTypeInfo

def getPrimaryKeysOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}
def getCatalogs(conn: Connection): ResultSet =
conn.getMetaData.getCatalogs

def getCrossReferenceOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}
def getSchemas(conn: Connection, catalog: String, schemaPattern: String): ResultSet =
conn.getMetaData.getSchemas(catalog, schemaPattern)

def getTableTypes(conn: Connection): ResultSet =
conn.getMetaData.getTableTypes

def getTables(
conn: Connection,
catalog: String,
schemaPattern: String,
tableNamePattern: String,
types: Array[String]): ResultSet =
conn.getMetaData.getTables(catalog, schemaPattern, tableNamePattern, types)

def getColumns(
conn: Connection,
catalog: String,
schemaPattern: String,
tableNamePattern: String,
columnNamePattern: String): ResultSet =
conn.getMetaData.getColumns(catalog, schemaPattern, tableNamePattern, columnNamePattern)

def getFunctions(
conn: Connection,
catalog: String,
schemaPattern: String,
functionNamePattern: String): ResultSet =
conn.getMetaData.getFunctions(catalog, schemaPattern, functionNamePattern)

def getPrimaryKeys(
conn: Connection,
catalog: String,
schema: String,
table: String): ResultSet =
conn.getMetaData.getPrimaryKeys(catalog, schema, table)

def getCrossReference(
conn: Connection,
primaryCatalog: String,
primarySchema: String,
primaryTable: String,
foreignCatalog: String,
foreignSchema: String,
foreignTable: String): ResultSet =
conn.getMetaData.getCrossReference(
primaryCatalog,
primarySchema,
primaryTable,
foreignCatalog,
foreignSchema,
foreignTable)

def getTRowSetGenerator(): JdbcTRowSetGenerator

Expand Down
Loading
Loading