File tree Expand file tree Collapse file tree
externals/kyuubi-jdbc-engine/src
main/scala/org/apache/kyuubi/engine/jdbc/operation
test/scala/org/apache/kyuubi/engine/jdbc
kyuubi-common/src/test/scala/org/apache/kyuubi/operation Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -129,6 +129,8 @@ class ExecuteStatement(
129129 if (jdbcStatement != null ) {
130130 dialect.cancelStatement(jdbcStatement)
131131 jdbcStatement = null
132+ } else {
133+ warn(s " The cancel operation $statementId might be ignore due to jdbcStatement is null. " )
132134 }
133135 }
134136 }
Original file line number Diff line number Diff line change @@ -23,6 +23,7 @@ import org.apache.kyuubi.config.KyuubiConf
2323import org .apache .kyuubi .engine .jdbc .connection .ConnectionProvider
2424import org .apache .kyuubi .operation .HiveJDBCTestHelper
2525import org .apache .kyuubi .shaded .hive .service .rpc .thrift ._
26+ import org .apache .kyuubi .shaded .hive .service .rpc .thrift .TOperationState .{CANCELED_STATE , RUNNING_STATE }
2627
2728class OperationWithEngineSuite extends MySQLOperationSuite with HiveJDBCTestHelper {
2829
@@ -90,13 +91,22 @@ class OperationWithEngineSuite extends MySQLOperationSuite with HiveJDBCTestHelp
9091 val tExecuteStatementResp = client.ExecuteStatement (tExecuteStatementReq)
9192 assert(tExecuteStatementResp.getStatus.getStatusCode === TStatusCode .SUCCESS_STATUS )
9293
93- Thread .sleep(1000 ) // wait for statement to start executing
94+ assertOperationStatusIn(
95+ client,
96+ tExecuteStatementResp.getOperationHandle,
97+ Set (RUNNING_STATE ),
98+ 5 )
9499
95100 val tCancelOperationReq = new TCancelOperationReq ()
96101 tCancelOperationReq.setOperationHandle(tExecuteStatementResp.getOperationHandle)
97102 val TCancelOperationReq = client.CancelOperation (tCancelOperationReq)
98103 assert(TCancelOperationReq .getStatus.getStatusCode === TStatusCode .SUCCESS_STATUS )
99- // If the statement is not cancelled successfully, will block here until 120s
104+
105+ assertOperationStatusIn(
106+ client,
107+ tExecuteStatementResp.getOperationHandle,
108+ Set (CANCELED_STATE ),
109+ 5 )
100110 }
101111 }
102112 }
Original file line number Diff line number Diff line change @@ -24,6 +24,7 @@ import org.apache.kyuubi.config.KyuubiConf
2424import org .apache .kyuubi .engine .jdbc .connection .ConnectionProvider
2525import org .apache .kyuubi .operation .HiveJDBCTestHelper
2626import org .apache .kyuubi .shaded .hive .service .rpc .thrift ._
27+ import org .apache .kyuubi .shaded .hive .service .rpc .thrift .TOperationState .{CANCELED_STATE , RUNNING_STATE }
2728
2829class StarRocksOperationWithEngineSuite extends StarRocksOperationSuite with HiveJDBCTestHelper {
2930
@@ -90,14 +91,23 @@ class StarRocksOperationWithEngineSuite extends StarRocksOperationSuite with Hiv
9091 tExecuteStatementReq.setRunAsync(true )
9192 val tExecuteStatementResp = client.ExecuteStatement (tExecuteStatementReq)
9293
93- Thread .sleep(1000 ) // wait for statement to start executing
94+ assertOperationStatusIn(
95+ client,
96+ tExecuteStatementResp.getOperationHandle,
97+ Set (RUNNING_STATE ),
98+ 5 )
9499
95100 val tCancelOperationReq = new TCancelOperationReq ()
96101 tCancelOperationReq.setOperationHandle(tExecuteStatementResp.getOperationHandle)
97102
98103 val tFetchResultsResp = client.CancelOperation (tCancelOperationReq)
99104 assert(tFetchResultsResp.getStatus.getStatusCode === TStatusCode .SUCCESS_STATUS )
100- // If the statement is not cancelled successfully, will block here until 120s
105+
106+ assertOperationStatusIn(
107+ client,
108+ tExecuteStatementResp.getOperationHandle,
109+ Set (CANCELED_STATE ),
110+ 5 )
101111 }
102112 }
103113 }
Original file line number Diff line number Diff line change @@ -124,11 +124,27 @@ trait HiveJDBCTestHelper extends JDBCTestHelper {
124124 }
125125
126126 def waitForOperationToComplete (client : Iface , op : TOperationHandle ): Unit = {
127- val req = new TGetOperationStatusReq (op)
128- var state = client.GetOperationStatus (req).getOperationState
129- eventually(timeout(90 .seconds), interval(100 .milliseconds)) {
130- state = client.GetOperationStatus (req).getOperationState
131- assert(! Set (INITIALIZED_STATE , PENDING_STATE , RUNNING_STATE ).contains(state))
127+ assertOperationStatusIn(
128+ client,
129+ op,
130+ Set (
131+ FINISHED_STATE ,
132+ CANCELED_STATE ,
133+ CLOSED_STATE ,
134+ ERROR_STATE ,
135+ UKNOWN_STATE ,
136+ TIMEDOUT_STATE ),
137+ 90 )
138+ }
139+
140+ def assertOperationStatusIn (
141+ client : Iface ,
142+ op : TOperationHandle ,
143+ status : Set [TOperationState ],
144+ timeoutInSeconds : Int ): Unit = {
145+ eventually(timeout(timeoutInSeconds.seconds), interval(100 .milliseconds)) {
146+ val state = client.GetOperationStatus (new TGetOperationStatusReq (op)).getOperationState
147+ assert(status.contains(state))
132148 }
133149 }
134150
You can’t perform that action at this time.
0 commit comments