Skip to content

Commit d927966

Browse files
[KYUUBI #7305] Replace higher-order function operations with concrete classes
Convert the metadata and lifecycle operations from higher-order-function constructor parameters to concrete subclasses, matching the per-operation file style of the spark/hive engines. ExecuteMetaDataOperation becomes an abstract base whose subclasses (GetTypeInfo, GetCatalogs, GetSchemas, GetTables, GetTableTypes, GetColumns, GetFunctions, GetPrimaryKeys, GetCrossReference) each fix the metadata RPC; the four lifecycle classes (SetCurrentCatalog/Database, GetCurrentCatalog/Database) likewise own the catalog/database parameter directly and call the dialect from runInternal. JdbcOperationManager's factory methods collapse to one-liners.
1 parent 94883e4 commit d927966

15 files changed

Lines changed: 321 additions & 106 deletions

externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteMetaDataOperation.scala

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,12 @@ import org.apache.kyuubi.session.Session
3131
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TFetchResultsResp
3232

3333
/**
34-
* Runs a `DatabaseMetaData.getXxx(...)` call against the session connection and exposes the
35-
* resulting [[ResultSet]] as the operation output.
34+
* Base class that streams the result of a `DatabaseMetaData.getXxx(...)` call against the
35+
* session connection. Subclasses fix the particular metadata RPC via [[runMetaDataCall]].
3636
*/
37-
class ExecuteMetaDataOperation(
38-
session: Session,
39-
metadataCall: ExecuteMetaDataOperation.MetaDataCall)
40-
extends JdbcOperation(session) {
37+
abstract class ExecuteMetaDataOperation(session: Session) extends JdbcOperation(session) {
38+
39+
protected def runMetaDataCall(dialect: JdbcDialect, conn: Connection): ResultSet
4140

4241
private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
4342
override def getOperationLog: Option[OperationLog] = Option(operationLog)
@@ -48,7 +47,7 @@ class ExecuteMetaDataOperation(
4847
setState(OperationState.RUNNING)
4948
try {
5049
val connection = session.asInstanceOf[JdbcSessionImpl].sessionConnection
51-
val resultSet = metadataCall(dialect, connection)
50+
val resultSet = runMetaDataCall(dialect, connection)
5251
val iterator = new ResultSetFetchIterator(resultSet)
5352
try {
5453
// Normalise spec-divergent column labels (e.g. Impala HS2 TABLE_MD -> TABLE_SCHEM).
@@ -118,7 +117,3 @@ class ExecuteMetaDataOperation(
118117
}
119118
}
120119
}
121-
122-
object ExecuteMetaDataOperation {
123-
type MetaDataCall = (JdbcDialect, Connection) => ResultSet
124-
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kyuubi.engine.jdbc.operation
18+
19+
import java.sql.{Connection, ResultSet}
20+
21+
import org.apache.kyuubi.engine.jdbc.dialect.JdbcDialect
22+
import org.apache.kyuubi.session.Session
23+
24+
class GetCatalogs(session: Session) extends ExecuteMetaDataOperation(session) {
25+
override protected def runMetaDataCall(dialect: JdbcDialect, conn: Connection): ResultSet =
26+
dialect.getCatalogs(conn)
27+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kyuubi.engine.jdbc.operation
18+
19+
import java.sql.{Connection, ResultSet}
20+
21+
import org.apache.kyuubi.engine.jdbc.dialect.JdbcDialect
22+
import org.apache.kyuubi.session.Session
23+
24+
class GetColumns(
25+
session: Session,
26+
catalog: String,
27+
schema: String,
28+
tableName: String,
29+
columnName: String)
30+
extends ExecuteMetaDataOperation(session) {
31+
override protected def runMetaDataCall(dialect: JdbcDialect, conn: Connection): ResultSet =
32+
dialect.getColumns(conn, catalog, schema, tableName, columnName)
33+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kyuubi.engine.jdbc.operation
18+
19+
import java.sql.{Connection, ResultSet}
20+
21+
import org.apache.kyuubi.engine.jdbc.dialect.JdbcDialect
22+
import org.apache.kyuubi.session.Session
23+
24+
class GetCrossReference(
25+
session: Session,
26+
primaryCatalog: String,
27+
primarySchema: String,
28+
primaryTable: String,
29+
foreignCatalog: String,
30+
foreignSchema: String,
31+
foreignTable: String)
32+
extends ExecuteMetaDataOperation(session) {
33+
override protected def runMetaDataCall(dialect: JdbcDialect, conn: Connection): ResultSet =
34+
dialect.getCrossReference(
35+
conn,
36+
primaryCatalog,
37+
primarySchema,
38+
primaryTable,
39+
foreignCatalog,
40+
foreignSchema,
41+
foreignTable)
42+
}

externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/GetCurrentCatalog.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,20 @@
1616
*/
1717
package org.apache.kyuubi.engine.jdbc.operation
1818

19-
import java.sql.{Connection, Types}
19+
import java.sql.Types
2020

21-
import org.apache.kyuubi.engine.jdbc.dialect.JdbcDialect
2221
import org.apache.kyuubi.engine.jdbc.schema.{Column, Row, Schema}
2322
import org.apache.kyuubi.engine.jdbc.session.JdbcSessionImpl
2423
import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationState}
2524
import org.apache.kyuubi.session.Session
2625

27-
class GetCurrentCatalog(session: Session, fetch: (JdbcDialect, Connection) => String)
28-
extends JdbcOperation(session) {
26+
class GetCurrentCatalog(session: Session) extends JdbcOperation(session) {
2927

3028
override protected def runInternal(): Unit = {
3129
setState(OperationState.RUNNING)
3230
try {
3331
val connection = session.asInstanceOf[JdbcSessionImpl].sessionConnection
34-
val catalog = fetch(dialect, connection)
32+
val catalog = dialect.getCatalog(connection)
3533
schema = Schema(List(Column(
3634
"TABLE_CAT",
3735
"VARCHAR",

externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/GetCurrentDatabase.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,20 @@
1616
*/
1717
package org.apache.kyuubi.engine.jdbc.operation
1818

19-
import java.sql.{Connection, Types}
19+
import java.sql.Types
2020

21-
import org.apache.kyuubi.engine.jdbc.dialect.JdbcDialect
2221
import org.apache.kyuubi.engine.jdbc.schema.{Column, Row, Schema}
2322
import org.apache.kyuubi.engine.jdbc.session.JdbcSessionImpl
2423
import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationState}
2524
import org.apache.kyuubi.session.Session
2625

27-
class GetCurrentDatabase(session: Session, fetch: (JdbcDialect, Connection) => String)
28-
extends JdbcOperation(session) {
26+
class GetCurrentDatabase(session: Session) extends JdbcOperation(session) {
2927

3028
override protected def runInternal(): Unit = {
3129
setState(OperationState.RUNNING)
3230
try {
3331
val connection = session.asInstanceOf[JdbcSessionImpl].sessionConnection
34-
val database = fetch(dialect, connection)
32+
val database = dialect.getCurrentSchema(connection)
3533
schema = Schema(List(Column(
3634
"TABLE_SCHEM",
3735
"VARCHAR",
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kyuubi.engine.jdbc.operation
18+
19+
import java.sql.{Connection, ResultSet}
20+
21+
import org.apache.kyuubi.engine.jdbc.dialect.JdbcDialect
22+
import org.apache.kyuubi.session.Session
23+
24+
class GetFunctions(session: Session, catalog: String, schema: String, functionName: String)
25+
extends ExecuteMetaDataOperation(session) {
26+
override protected def runMetaDataCall(dialect: JdbcDialect, conn: Connection): ResultSet =
27+
dialect.getFunctions(conn, catalog, schema, functionName)
28+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kyuubi.engine.jdbc.operation
18+
19+
import java.sql.{Connection, ResultSet}
20+
21+
import org.apache.kyuubi.engine.jdbc.dialect.JdbcDialect
22+
import org.apache.kyuubi.session.Session
23+
24+
class GetPrimaryKeys(session: Session, catalog: String, schema: String, tableName: String)
25+
extends ExecuteMetaDataOperation(session) {
26+
override protected def runMetaDataCall(dialect: JdbcDialect, conn: Connection): ResultSet =
27+
dialect.getPrimaryKeys(conn, catalog, schema, tableName)
28+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kyuubi.engine.jdbc.operation
18+
19+
import java.sql.{Connection, ResultSet}
20+
21+
import org.apache.kyuubi.engine.jdbc.dialect.JdbcDialect
22+
import org.apache.kyuubi.session.Session
23+
24+
class GetSchemas(session: Session, catalog: String, schema: String)
25+
extends ExecuteMetaDataOperation(session) {
26+
override protected def runMetaDataCall(dialect: JdbcDialect, conn: Connection): ResultSet =
27+
dialect.getSchemas(conn, catalog, schema)
28+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kyuubi.engine.jdbc.operation
18+
19+
import java.sql.{Connection, ResultSet}
20+
21+
import org.apache.kyuubi.engine.jdbc.dialect.JdbcDialect
22+
import org.apache.kyuubi.session.Session
23+
24+
class GetTableTypes(session: Session) extends ExecuteMetaDataOperation(session) {
25+
override protected def runMetaDataCall(dialect: JdbcDialect, conn: Connection): ResultSet =
26+
dialect.getTableTypes(conn)
27+
}

0 commit comments

Comments
 (0)