Skip to content

Commit 291cfd1

Browse files
authored
HIVE-29530: Hive's explicit transaction doesn't commit (#6395)
1 parent bbd83df commit 291cfd1

File tree

5 files changed

+123
-9
lines changed

5 files changed

+123
-9
lines changed

iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTxnCoordinator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.hadoop.conf.Configuration;
2828
import org.apache.hadoop.hive.TxnCoordinator;
2929
import org.apache.hadoop.hive.common.StatsSetupConst;
30+
import org.apache.hadoop.hive.conf.HiveConf;
3031
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
3132
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
3233
import org.apache.hadoop.hive.metastore.api.Table;
@@ -69,6 +70,7 @@ public class HiveTxnCoordinator implements TxnCoordinator {
6970

7071
public HiveTxnCoordinator(Configuration conf, IMetaStoreClient msClient, boolean isExplicitTransaction) {
7172
this.conf = conf;
73+
HiveConf.setBoolVar(conf, HiveConf.ConfVars.TXN_WRITE_X_LOCK, true);
7274
this.msClient = msClient;
7375
this.isExplicitTransaction = isExplicitTransaction;
7476
}

iceberg/iceberg-handler/src/test/queries/positive/iceberg_multi_table_txn.q

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,26 @@ commit;
5050
select * from iceberg_txn_t1 order by a;
5151
select * from iceberg_txn_t2 order by a;
5252

53+
-- Test with ext.locking.enabled
54+
55+
set iceberg.engine.hive.lock-enabled=false;
56+
set hive.txn.ext.locking.enabled=true;
57+
58+
from (
59+
select 1 as a union all select 2
60+
) s
61+
insert into iceberg_txn_t1
62+
select a
63+
insert into iceberg_txn_t2
64+
select a + 10;
65+
66+
start transaction;
67+
update iceberg_txn_t1 set a = a + 1;
68+
insert into iceberg_txn_t2 select * from iceberg_txn_t1;
69+
commit;
70+
71+
select * from iceberg_txn_t1 order by a;
72+
select * from iceberg_txn_t2 order by a;
73+
5374
drop table if exists iceberg_txn_t1;
5475
drop table if exists iceberg_txn_t2;

iceberg/iceberg-handler/src/test/results/positive/iceberg_multi_table_txn.q.out

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,90 @@ POSTHOOK: Output: hdfs://### HDFS PATH ###
209209
4
210210
5
211211
6
212+
PREHOOK: query: from (
213+
select 1 as a union all select 2
214+
) s
215+
insert into iceberg_txn_t1
216+
select a
217+
insert into iceberg_txn_t2
218+
select a + 10
219+
PREHOOK: type: QUERY
220+
PREHOOK: Input: _dummy_database@_dummy_table
221+
PREHOOK: Output: default@iceberg_txn_t1
222+
PREHOOK: Output: default@iceberg_txn_t2
223+
POSTHOOK: query: from (
224+
select 1 as a union all select 2
225+
) s
226+
insert into iceberg_txn_t1
227+
select a
228+
insert into iceberg_txn_t2
229+
select a + 10
230+
POSTHOOK: type: QUERY
231+
POSTHOOK: Input: _dummy_database@_dummy_table
232+
POSTHOOK: Output: default@iceberg_txn_t1
233+
POSTHOOK: Output: default@iceberg_txn_t2
234+
PREHOOK: query: start transaction
235+
PREHOOK: type: START TRANSACTION
236+
POSTHOOK: query: start transaction
237+
POSTHOOK: type: START TRANSACTION
238+
PREHOOK: query: update iceberg_txn_t1 set a = a + 1
239+
PREHOOK: type: QUERY
240+
PREHOOK: Input: default@iceberg_txn_t1
241+
PREHOOK: Output: default@iceberg_txn_t1
242+
PREHOOK: Output: default@iceberg_txn_t1
243+
POSTHOOK: query: update iceberg_txn_t1 set a = a + 1
244+
POSTHOOK: type: QUERY
245+
POSTHOOK: Input: default@iceberg_txn_t1
246+
POSTHOOK: Output: default@iceberg_txn_t1
247+
POSTHOOK: Output: default@iceberg_txn_t1
248+
PREHOOK: query: insert into iceberg_txn_t2 select * from iceberg_txn_t1
249+
PREHOOK: type: QUERY
250+
PREHOOK: Input: default@iceberg_txn_t1
251+
PREHOOK: Output: default@iceberg_txn_t2
252+
POSTHOOK: query: insert into iceberg_txn_t2 select * from iceberg_txn_t1
253+
POSTHOOK: type: QUERY
254+
POSTHOOK: Input: default@iceberg_txn_t1
255+
POSTHOOK: Output: default@iceberg_txn_t2
256+
PREHOOK: query: commit
257+
PREHOOK: type: COMMIT
258+
POSTHOOK: query: commit
259+
POSTHOOK: type: COMMIT
260+
PREHOOK: query: select * from iceberg_txn_t1 order by a
261+
PREHOOK: type: QUERY
262+
PREHOOK: Input: default@iceberg_txn_t1
263+
PREHOOK: Output: hdfs://### HDFS PATH ###
264+
POSTHOOK: query: select * from iceberg_txn_t1 order by a
265+
POSTHOOK: type: QUERY
266+
POSTHOOK: Input: default@iceberg_txn_t1
267+
POSTHOOK: Output: hdfs://### HDFS PATH ###
268+
2
269+
3
270+
4
271+
5
272+
6
273+
7
274+
PREHOOK: query: select * from iceberg_txn_t2 order by a
275+
PREHOOK: type: QUERY
276+
PREHOOK: Input: default@iceberg_txn_t2
277+
PREHOOK: Output: hdfs://### HDFS PATH ###
278+
POSTHOOK: query: select * from iceberg_txn_t2 order by a
279+
POSTHOOK: type: QUERY
280+
POSTHOOK: Input: default@iceberg_txn_t2
281+
POSTHOOK: Output: hdfs://### HDFS PATH ###
282+
11
283+
12
284+
12
285+
13
286+
2
287+
3
288+
3
289+
4
290+
4
291+
5
292+
5
293+
6
294+
6
295+
7
212296
PREHOOK: query: drop table if exists iceberg_txn_t1
213297
PREHOOK: type: DROPTABLE
214298
PREHOOK: Input: default@iceberg_txn_t1

ql/src/java/org/apache/hadoop/hive/ql/DriverTxnHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -584,10 +584,12 @@ void destroy(String queryIdFromDriver) {
584584

585585
boolean isTxnOpen =
586586
txnManager != null && txnManager.isTxnOpen() &&
587-
(txnManager.isImplicitTransactionOpen(context) || COMMIT_OR_ROLLBACK.contains(hiveOp)) &&
588587
org.apache.commons.lang3.StringUtils.equals(queryIdFromDriver, txnManager.getQueryid());
589588

590-
release(!hiveLocks.isEmpty() || isTxnOpen);
589+
boolean isImplicitTxnOrCommit = isTxnOpen &&
590+
(txnManager.isImplicitTransactionOpen(context) || COMMIT_OR_ROLLBACK.contains(hiveOp));
591+
592+
release((!hiveLocks.isEmpty() && !isTxnOpen) || isImplicitTxnOrCommit);
591593
}
592594

593595
private void release(boolean releaseLocks) {

ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,7 @@ private void resetTxnInfo() {
528528
lockMgr = null;
529529
}
530530
txnId = 0;
531+
isExplicitTransaction = false;
531532
txnCoordinator = null;
532533
stmtId = -1;
533534
numStatements = 0;
@@ -601,6 +602,9 @@ public void commitTxn() throws LockException {
601602
clearLocksAndHB();
602603
getTxnCoordinator().commit();
603604

605+
if (isExplicitTxnOpen()) {
606+
getDefaultCoordinator().commit();
607+
}
604608
} catch (NoSuchTxnException e) {
605609
LOG.error("Metastore could not find " + JavaUtils.txnIdToString(txnId));
606610
throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(txnId));
@@ -625,6 +629,9 @@ public void rollbackTxn() throws LockException {
625629
clearLocksAndHB();
626630
getTxnCoordinator().rollback();
627631

632+
if (isExplicitTxnOpen()) {
633+
getDefaultCoordinator().rollback();
634+
}
628635
} catch (NoSuchTxnException e) {
629636
LOG.error("Metastore could not find " + JavaUtils.txnIdToString(txnId));
630637
throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(txnId));
@@ -913,13 +920,7 @@ public boolean isImplicitTransactionOpen(Context ctx) {
913920
//some commands like "show databases" don't start implicit transactions
914921
return false;
915922
}
916-
if (!isExplicitTransaction) {
917-
if (ctx == null || !ctx.isExplainSkipExecution()) {
918-
assert numStatements <= 1 : "numStatements=" + numStatements;
919-
}
920-
return true;
921-
}
922-
return false;
923+
return !isExplicitTransaction;
923924
}
924925

925926
@Override
@@ -987,6 +988,10 @@ public boolean isTxnOpen() {
987988
txnCoordinator != null && txnCoordinator.hasPendingWork();
988989
}
989990

991+
private boolean isExplicitTxnOpen() {
992+
return isExplicitTransaction && txnId > 0;
993+
}
994+
990995
@Override
991996
public long getCurrentTxnId() {
992997
return txnId;

0 commit comments

Comments
 (0)