This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 56e59ed HIVE-23340: TxnHandler cleanup (Peter Varga, reviewed by Denys Kuzmenko) 56e59ed is described below commit 56e59ed6a7c6f430b8ce0f8ac85bd936eabcbda4 Author: Peter Varga <pva...@cloudera.com> AuthorDate: Fri Jun 12 14:22:36 2020 +0200 HIVE-23340: TxnHandler cleanup (Peter Varga, reviewed by Denys Kuzmenko) --- .../hive/metastore/txn/CompactionTxnHandler.java | 17 +- .../apache/hadoop/hive/metastore/txn/OpenTxn.java | 110 ++++++++ .../hadoop/hive/metastore/txn/OpenTxnList.java | 78 ++++++ .../hadoop/hive/metastore/txn/OperationType.java | 65 +++++ .../hadoop/hive/metastore/txn/TxnHandler.java | 305 ++++++--------------- .../hadoop/hive/metastore/txn/TxnStatus.java | 65 +++++ 6 files changed, 410 insertions(+), 230 deletions(-) diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index d2efc59..b1bf10a 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -37,6 +37,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; + /** * Extends the transaction handler with methods needed only by the compactor threads. These * methods are not available through the thrift interface. @@ -109,7 +110,7 @@ class CompactionTxnHandler extends TxnHandler { final String sCheckAborted = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\"," + "MIN(\"TXN_STARTED\"), COUNT(*)" + "FROM \"TXNS\", \"TXN_COMPONENTS\" " - + "WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = '" + TXN_ABORTED + "' " + + "WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = " + TxnStatus.ABORTED + " " + "GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\"" + (checkAbortedTimeThreshold ? "" : " HAVING COUNT(*) > " + abortedThreshold); @@ -400,7 +401,7 @@ class CompactionTxnHandler extends TxnHandler { * See {@link ql.txn.compactor.Cleaner.removeFiles()} */ s = "SELECT DISTINCT \"TXN_ID\" FROM \"TXNS\", \"TXN_COMPONENTS\" WHERE \"TXN_ID\" = \"TC_TXNID\" " - + "AND \"TXN_STATE\" = '" + TXN_ABORTED + "' AND \"TC_DATABASE\" = ? AND \"TC_TABLE\" = ?"; + + "AND \"TXN_STATE\" = " + TxnStatus.ABORTED + " AND \"TC_DATABASE\" = ? AND \"TC_TABLE\" = ?"; if (info.highestWriteId != 0) s += " AND \"TC_WRITEID\" <= ?"; if (info.partName != null) s += " AND \"TC_PARTITION\" = ?"; @@ -513,8 +514,8 @@ class CompactionTxnHandler extends TxnHandler { "UNION " + "SELECT MIN(\"WS_COMMIT_ID\") AS \"ID\" FROM \"WRITE_SET\" " + "UNION " + - "SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_ABORTED) + - " OR \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + + "SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + TxnStatus.ABORTED + + " OR \"TXN_STATE\" = " + TxnStatus.OPEN + ") \"RES\""; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); @@ -576,7 +577,7 @@ class CompactionTxnHandler extends TxnHandler { String s = "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE " + "\"TXN_ID\" NOT IN (SELECT \"TC_TXNID\" FROM \"TXN_COMPONENTS\") AND " + - " (\"TXN_STATE\" = '" + TXN_ABORTED + "' OR \"TXN_STATE\" = '" + TXN_COMMITTED + "') AND " + " (\"TXN_STATE\" = " + TxnStatus.ABORTED + " OR \"TXN_STATE\" = " + TxnStatus.COMMITTED + ") AND " + " \"TXN_ID\" < " + lowWaterMark; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); @@ -811,7 +812,7 @@ class CompactionTxnHandler extends TxnHandler { quoteString(ci.tableName) + "," + (ci.partName == null ? "" : quoteString(ci.partName) + ",") + ci.highestWriteId + ", " + - quoteChar(OperationType.COMPACT.getSqlConst()) + ")"; + OperationType.COMPACT + ")"; if(LOG.isDebugEnabled()) { LOG.debug("About to execute: " + sqlText); } @@ -1147,7 +1148,7 @@ class CompactionTxnHandler extends TxnHandler { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String query = "SELECT COUNT(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN); + String query = "SELECT COUNT(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + TxnStatus.OPEN; LOG.debug("Going to execute query <" + query + ">"); rs = stmt.executeQuery(query); if (!rs.next()) { @@ -1156,7 +1157,7 @@ class CompactionTxnHandler extends TxnHandler { long numOpenTxns = rs.getLong(1); if (numOpenTxns > 0) { query = "SELECT MIN(\"RES\".\"ID\") FROM (" + - "SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + + "SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + TxnStatus.OPEN + " UNION " + "SELECT MAX(\"CQ_NEXT_TXN_ID\") AS \"ID\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = " + quoteChar(READY_FOR_CLEANING) + diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/OpenTxn.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/OpenTxn.java new file mode 100644 index 0000000..8ef5fa1 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/OpenTxn.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import org.apache.hadoop.hive.metastore.api.TxnInfo; +import org.apache.hadoop.hive.metastore.api.TxnType; + +/** + * Class to represent one row in the TXNS table. + */ +public class OpenTxn { + + public static final String OPEN_TXNS_QUERY = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_TYPE\", " + + "(%s - \"TXN_STARTED\") FROM \"TXNS\" ORDER BY \"TXN_ID\""; + public static final String OPEN_TXNS_INFO_QUERY = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_TYPE\", " + + "(%s - \"TXN_STARTED\"), \"TXN_USER\", \"TXN_HOST\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\" " + + "FROM \"TXNS\" ORDER BY \"TXN_ID\""; + + private long txnId; + private TxnStatus status; + private TxnType type; + private long startedTime; + private long lastHeartBeatTime; + private String user; + private String host; + + public OpenTxn(long txnId, TxnStatus status, TxnType type) { + this.txnId = txnId; + this.status = status; + this.type = type; + } + + public TxnInfo toTxnInfo() { + TxnInfo info = new TxnInfo(getTxnId(), getStatus().toTxnState(), getUser(), getHost()); + info.setStartedTime(getStartedTime()); + info.setLastHeartbeatTime(getLastHeartBeatTime()); + return info; + } + + public long getTxnId() { + return txnId; + } + + public void setTxnId(long txnId) { + this.txnId = txnId; + } + + public TxnStatus getStatus() { + return status; + } + + public void setStatus(TxnStatus status) { + this.status = status; + } + + public TxnType getType() { + return type; + } + + public void setType(TxnType type) { + this.type = type; + } + + public long getStartedTime() { + return startedTime; + } + + public void setStartedTime(long startedTime) { + this.startedTime = startedTime; + } + + public long getLastHeartBeatTime() { + return lastHeartBeatTime; + } + + public void setLastHeartBeatTime(long lastHeartBeatTime) { + this.lastHeartBeatTime = lastHeartBeatTime; + } + + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/OpenTxnList.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/OpenTxnList.java new file mode 100644 index 0000000..1ed5759 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/OpenTxnList.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.TxnType; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; + +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.metastore.txn.TxnStatus.ABORTED; +import static org.apache.hadoop.hive.metastore.txn.TxnStatus.OPEN; + +/** + * Class for the getOpenTxnList calculation. + */ +public class OpenTxnList { + private long hwm; + private List<OpenTxn> openTxnList; + + public OpenTxnList(long hwm, List<OpenTxn> openTxnList) { + this.hwm = hwm; + this.openTxnList = openTxnList; + } + + public GetOpenTxnsInfoResponse toOpenTxnsInfoResponse() { + return new GetOpenTxnsInfoResponse(getHwm(), openTxnList.stream().map(OpenTxn::toTxnInfo).collect(toList())); + } + public GetOpenTxnsResponse toOpenTxnsResponse() { + List<Long> openList = new ArrayList<>(); + long minOpenTxn = Long.MAX_VALUE; + BitSet abortedBits = new BitSet(); + for (OpenTxn openTxn : getOpenTxnList()) { + if (openTxn.getStatus() == OPEN) { + minOpenTxn = Math.min(minOpenTxn, openTxn.getTxnId()); + } + if (openTxn.getType() != TxnType.READ_ONLY) { + openList.add(openTxn.getTxnId()); + if (openTxn.getStatus() == ABORTED) { + abortedBits.set(openList.size() - 1); + } + } + } + ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray()); + GetOpenTxnsResponse otr = new GetOpenTxnsResponse(getHwm(), openList, byteBuffer); + if (minOpenTxn < Long.MAX_VALUE) { + otr.setMin_open_txn(minOpenTxn); + } + return otr; + } + + public long getHwm() { + return hwm; + } + + public List<OpenTxn> getOpenTxnList() { + return openTxnList; + } +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/OperationType.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/OperationType.java new file mode 100644 index 0000000..39cacd2 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/OperationType.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import org.apache.hadoop.hive.metastore.api.DataOperationType; + +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; + +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; + +/** + * These are the valid values for TXN_COMPONENTS.TC_OPERATION_TYPE. + */ +public enum OperationType { + SELECT('s', DataOperationType.SELECT), + INSERT('i', DataOperationType.INSERT), + UPDATE('u', DataOperationType.UPDATE), + DELETE('d', DataOperationType.DELETE), + COMPACT('c', null); + + private final char sqlConst; + private final DataOperationType dataOperationType; + + private static final Map<DataOperationType, OperationType> DOT_LOOKUP = + Arrays.stream(OperationType.values()).collect(toMap(OperationType::getDataOperationType, identity())); + + OperationType(char sqlConst, DataOperationType dataOperationType) { + this.sqlConst = sqlConst; + this.dataOperationType = dataOperationType; + } + + public String toString() { + return "'" + getSqlConst() + "'"; + } + + public static OperationType fromDataOperationType(DataOperationType dop) { + return Optional.of(DOT_LOOKUP.get(dop)).orElseThrow(IllegalArgumentException::new); + } + + public String getSqlConst() { + return Character.toString(sqlConst); + } + + public DataOperationType getDataOperationType() { + return dataOperationType; + } +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 89ddccb..5c1ec5b 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -120,9 +120,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; import org.apache.hadoop.hive.metastore.api.TxnAbortedException; -import org.apache.hadoop.hive.metastore.api.TxnInfo; import org.apache.hadoop.hive.metastore.api.TxnOpenException; -import org.apache.hadoop.hive.metastore.api.TxnState; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.api.UnlockRequest; @@ -226,15 +224,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { static final protected char MAJOR_TYPE = 'a'; static final protected char MINOR_TYPE = 'i'; - // Transaction states - protected static final char TXN_ABORTED = 'a'; - protected static final char TXN_OPEN = 'o'; - protected static final char TXN_COMMITTED = 'c'; - private static final char TXN_TMP = '_'; - - //todo: make these like OperationType and remove above char constants - enum TxnStatus {OPEN, ABORTED, COMMITTED, UNKNOWN} + private static final String TXN_TMP_STATE = "_"; // Lock states static final protected char LOCK_ACQUIRED = 'a'; @@ -277,53 +268,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { private List<TransactionalMetaStoreEventListener> transactionalListeners; - /** - * These are the valid values for TXN_COMPONENTS.TC_OPERATION_TYPE - */ - enum OperationType { - SELECT('s'), INSERT('i'), UPDATE('u'), DELETE('d'), COMPACT('c'); - private final char sqlConst; - OperationType(char sqlConst) { - this.sqlConst = sqlConst; - } - public String toString() { - return Character.toString(sqlConst); - } - public static OperationType fromString(char sqlConst) { - switch (sqlConst) { - case 's': - return SELECT; - case 'i': - return INSERT; - case 'u': - return UPDATE; - case 'd': - return DELETE; - case 'c': - return COMPACT; - default: - throw new IllegalArgumentException(quoteChar(sqlConst)); - } - } - public static OperationType fromDataOperationType(DataOperationType dop) { - switch (dop) { - case SELECT: - return OperationType.SELECT; - case INSERT: - return OperationType.INSERT; - case UPDATE: - return OperationType.UPDATE; - case DELETE: - return OperationType.DELETE; - default: - throw new IllegalArgumentException("Unexpected value: " + dop); - } - } - char getSqlConst() { - return sqlConst; - } - } - // Maximum number of open transactions that's allowed private static volatile int maxOpenTxns = 0; // Whether number of open transactions reaches the threshold @@ -444,6 +388,16 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { @Override @RetrySemantics.ReadOnly public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { + return getOpenTxnsList(true).toOpenTxnsInfoResponse(); + } + + @Override + @RetrySemantics.ReadOnly + public GetOpenTxnsResponse getOpenTxns() throws MetaException { + return getOpenTxnsList(false).toOpenTxnsResponse(); + } + + private OpenTxnList getOpenTxnsList(boolean infoFields) throws MetaException { try { // We need to figure out the HighWaterMark and the list of open transactions. Connection dbConn = null; @@ -461,14 +415,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { */ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - List<TxnInfo> txnInfos = new ArrayList<>(); - - String s = - "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_USER\", \"TXN_HOST\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", " - + "(" + TxnDbUtil.getEpochFn(dbProduct) + " - \"TXN_STARTED\")" - + "FROM \"TXNS\" ORDER BY \"TXN_ID\""; - LOG.debug("Going to execute query<" + s + ">"); - rs = stmt.executeQuery(s); + List<OpenTxn> txnInfos = new ArrayList<>(); + String txnsQuery = String.format(infoFields ? OpenTxn.OPEN_TXNS_INFO_QUERY : OpenTxn.OPEN_TXNS_QUERY, + TxnDbUtil.getEpochFn(dbProduct)); + LOG.debug("Going to execute query<" + txnsQuery + ">"); + rs = stmt.executeQuery(txnsQuery); /* * We can use the maximum txn_id from the TXNS table as high water mark, since the commitTxn and the Initiator * guarantees, that the transaction with the highest txn_id will never be removed from the TXNS table. @@ -481,134 +432,48 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { while (rs.next()) { long txnId = rs.getLong(1); - long age = rs.getLong(7); + long age = rs.getLong(4); hwm = txnId; if (age < getOpenTxnTimeOutMillis()) { // We will consider every gap as an open transaction from the previous txnId openTxnLowBoundary++; while (txnId > openTxnLowBoundary) { // Add an empty open transaction for every missing value - txnInfos.add(new TxnInfo(openTxnLowBoundary, TxnState.OPEN, null, null)); + txnInfos.add(new OpenTxn(openTxnLowBoundary, TxnStatus.OPEN, TxnType.DEFAULT)); + LOG.debug("Open transaction added for missing value in TXNS {}", + JavaUtils.txnIdToString(openTxnLowBoundary)); openTxnLowBoundary++; } } else { openTxnLowBoundary = txnId; } - char c = rs.getString(2).charAt(0); - TxnState state; - switch (c) { - case TXN_COMMITTED: + TxnStatus state = TxnStatus.fromString(rs.getString(2)); + if (state == TxnStatus.COMMITTED) { // This is only here, to avoid adding this txnId as possible gap continue; - - case TXN_ABORTED: - state = TxnState.ABORTED; - break; - - case TXN_OPEN: - state = TxnState.OPEN; - break; - - default: - throw new MetaException("Unexpected transaction state " + c + " found in txns table"); } - TxnInfo txnInfo = new TxnInfo(txnId, state, rs.getString(3), rs.getString(4)); - txnInfo.setStartedTime(rs.getLong(5)); - txnInfo.setLastHeartbeatTime(rs.getLong(6)); + OpenTxn txnInfo = new OpenTxn(txnId, state, TxnType.findByValue(rs.getInt(3))); + if (infoFields) { + txnInfo.setUser(rs.getString(5)); + txnInfo.setHost(rs.getString(6)); + txnInfo.setStartedTime(rs.getLong(7)); + txnInfo.setLastHeartBeatTime(rs.getLong(8)); + } txnInfos.add(txnInfo); } - LOG.debug("Going to rollback"); dbConn.rollback(); - return new GetOpenTxnsInfoResponse(hwm, txnInfos); + LOG.debug("Got OpenTxnList with hwm: {} and openTxnList size {}.", hwm, txnInfos.size()); + return new OpenTxnList(hwm, txnInfos); } catch (SQLException e) { - LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "getOpenTxnsInfo"); + checkRetryable(dbConn, e, "getOpenTxnsList"); throw new MetaException( "Unable to select from transaction database: " + getMessage(e) + StringUtils.stringifyException(e)); } finally { close(rs, stmt, dbConn); } } catch (RetryException e) { - return getOpenTxnsInfo(); - } - } - - @Override - @RetrySemantics.ReadOnly - public GetOpenTxnsResponse getOpenTxns() throws MetaException { - try { - // We need to figure out the current transaction number and the list of open transactions. - Connection dbConn = null; - Statement stmt = null; - ResultSet rs = null; - try { - /* - * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()} - */ - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - - List<Long> openList = new ArrayList<>(); - String s = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_TYPE\", " - + "(" + TxnDbUtil.getEpochFn(dbProduct) + " - \"TXN_STARTED\")" - + " FROM \"TXNS\" ORDER BY \"TXN_ID\""; - LOG.debug("Going to execute query<" + s + ">"); - rs = stmt.executeQuery(s); - long hwm = 0; - long openTxnLowBoundary = 0; - long minOpenTxn = Long.MAX_VALUE; - BitSet abortedBits = new BitSet(); - while (rs.next()) { - long txnId = rs.getLong(1); - long age = rs.getLong(4); - hwm = txnId; - if (age < getOpenTxnTimeOutMillis()) { - // We will consider every gap as an open transaction from the previous txnId - openTxnLowBoundary++; - while (txnId > openTxnLowBoundary) { - // Add an empty open transaction for every missing value - openList.add(openTxnLowBoundary); - minOpenTxn = Math.min(minOpenTxn, openTxnLowBoundary); - openTxnLowBoundary++; - } - } else { - openTxnLowBoundary = txnId; - } - char txnState = rs.getString(2).charAt(0); - if (txnState == TXN_COMMITTED) { - continue; - } - if (txnState == TXN_OPEN) { - minOpenTxn = Math.min(minOpenTxn, txnId); - } - TxnType txnType = TxnType.findByValue(rs.getInt(3)); - if (txnType != TxnType.READ_ONLY) { - openList.add(txnId); - if (txnState == TXN_ABORTED) { - abortedBits.set(openList.size() - 1); - } - } - } - LOG.debug("Going to rollback"); - dbConn.rollback(); - ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray()); - GetOpenTxnsResponse otr = new GetOpenTxnsResponse(hwm, openList, byteBuffer); - if (minOpenTxn < Long.MAX_VALUE) { - otr.setMin_open_txn(minOpenTxn); - } - return otr; - } catch (SQLException e) { - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "getOpenTxns"); - throw new MetaException("Unable to select from transaction database, " - + StringUtils.stringifyException(e)); - } finally { - close(rs, stmt, dbConn); - } - } catch (RetryException e) { - return getOpenTxns(); + return getOpenTxnsList(infoFields); } } @@ -761,7 +626,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { TxnDbUtil.getEpochFn(dbProduct)); LOG.debug("Going to execute insert <" + insertQuery + ">"); try (PreparedStatement ps = dbConn.prepareStatement(insertQuery, new String[] {"TXN_ID"})) { - String state = genKeySupport ? Character.toString(TXN_OPEN) : Character.toString(TXN_TMP); + String state = genKeySupport ? TxnStatus.OPEN.getSqlConst() : TXN_TMP_STATE; if (numTxns == 1) { ps.setString(1, state); ps.setString(2, rqst.getUser()); @@ -838,7 +703,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } else { try (PreparedStatement pstmt = dbConn.prepareStatement("SELECT \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = ?")) { - pstmt.setString(1, Character.toString(TXN_TMP)); + pstmt.setString(1, TXN_TMP_STATE); try (ResultSet rs = pstmt.executeQuery()) { while (rs.next()) { txnIds.add(rs.getLong(1)); @@ -847,8 +712,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } try (PreparedStatement pstmt = dbConn .prepareStatement("UPDATE \"TXNS\" SET \"TXN_STATE\" = ? WHERE \"TXN_STATE\" = ?")) { - pstmt.setString(1, Character.toString(TXN_OPEN)); - pstmt.setString(2, Character.toString(TXN_TMP)); + pstmt.setString(1, TxnStatus.OPEN.getSqlConst()); + pstmt.setString(2, TXN_TMP_STATE); pstmt.executeUpdate(); } } @@ -1036,7 +901,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { txnid = targetTxnIds.get(0); } - TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TXN_OPEN); + TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TxnStatus.OPEN); if (txnRecord == null) { TxnStatus status = findTxnState(txnid, stmt); if (status == TxnStatus.ABORTED) { @@ -1091,9 +956,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { stmt = dbConn.createStatement(); List<String> queries = new ArrayList<>(); - StringBuilder prefix = new StringBuilder("SELECT \"TXN_ID\", \"TXN_TYPE\" from \"TXNS\" where \"TXN_STATE\" = ") - .append(quoteChar(TXN_OPEN)).append(" and \"TXN_TYPE\" != ").append(TxnType.READ_ONLY.getValue()) - .append(" and "); + StringBuilder prefix = + new StringBuilder("SELECT \"TXN_ID\", \"TXN_TYPE\" from \"TXNS\" where \"TXN_STATE\" = ") + .append(TxnStatus.OPEN) + .append(" and \"TXN_TYPE\" != ").append(TxnType.READ_ONLY.getValue()).append(" and "); TxnUtils.buildQueryWithINClause(conf, queries, prefix, new StringBuilder(), txnIds, "\"TXN_ID\"", false, false); @@ -1366,7 +1232,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { * should not normally run concurrently (for same txn) but could due to bugs in the client * which could then corrupt internal transaction manager state. Also competes with abortTxn(). */ - TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TXN_OPEN); + TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TxnStatus.OPEN); if (txnRecord == null) { //if here, txn was not found (in expected state) TxnStatus actualTxnStatus = findTxnState(txnid, stmt); @@ -1386,7 +1252,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } String conflictSQLSuffix = "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\"=" + txnid + " AND \"TC_OPERATION_TYPE\" IN(" + - quoteChar(OperationType.UPDATE.sqlConst) + "," + quoteChar(OperationType.DELETE.sqlConst) + ")"; + OperationType.UPDATE + "," + OperationType.DELETE + ")"; long tempCommitId = generateTemporaryId(); if (txnRecord.type != TxnType.READ_ONLY @@ -1549,10 +1415,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { //The same happens when Hive splits U=I+D early so it looks like 2 branches of a //multi-insert stmt (an Insert and a Delete branch). It also 'feels' // un-serializable to allow concurrent deletes - " and (\"COMMITTED\".\"WS_OPERATION_TYPE\" IN(" + quoteChar(OperationType.UPDATE.sqlConst) + - ", " + quoteChar(OperationType.DELETE.sqlConst) + - ") AND \"CUR\".\"WS_OPERATION_TYPE\" IN(" + quoteChar(OperationType.UPDATE.sqlConst) + ", " - + quoteChar(OperationType.DELETE.sqlConst) + "))"); + " and (\"COMMITTED\".\"WS_OPERATION_TYPE\" IN(" + OperationType.UPDATE + + ", " + OperationType.DELETE + + ") AND \"CUR\".\"WS_OPERATION_TYPE\" IN(" + OperationType.UPDATE+ ", " + + OperationType.DELETE + "))"); LOG.debug("Going to execute query: <" + writeConflictQuery + ">"); return stmt.executeQuery(writeConflictQuery); } @@ -1566,7 +1432,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { "' FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid + //we only track compactor activity in TXN_COMPONENTS to handle the case where the //compactor txn aborts - so don't bother copying it to COMPLETED_TXN_COMPONENTS - " AND \"TC_OPERATION_TYPE\" <> " + quoteChar(OperationType.COMPACT.sqlConst); + " AND \"TC_OPERATION_TYPE\" <> " + OperationType.COMPACT; LOG.debug("Going to execute insert <" + s + ">"); if ((stmt.executeUpdate(s)) < 1) { @@ -1591,7 +1457,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } queryBatch.add("DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid); // DO NOT remove the transaction from the TXN table, the cleaner will remove it when appropriate - queryBatch.add("UPDATE \"TXNS\" SET \"TXN_STATE\" = " + quoteChar(TXN_COMMITTED) + " WHERE \"TXN_ID\" = " + txnid); + queryBatch.add("UPDATE \"TXNS\" SET \"TXN_STATE\" = " + TxnStatus.COMMITTED + " WHERE \"TXN_ID\" = " + txnid); if (txnType == TxnType.MATER_VIEW_REBUILD) { queryBatch.add("DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid); } @@ -2235,7 +2101,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { stmt = dbConn.createStatement(); long minOpenTxn; - rs = stmt.executeQuery("SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\"=" + quoteChar(TXN_OPEN)); + rs = stmt.executeQuery("SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\"=" + TxnStatus.OPEN); if (!rs.next()) { throw new IllegalStateException("Scalar query returned no rows?!?!!"); } @@ -2373,7 +2239,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { throws MetaException { if (LOG.isDebugEnabled()) { - LOG.debug("Acquiring lock for materialization rebuild with txnId={} for {}", txnId, Warehouse.getQualifiedName(dbName,tableName)); + LOG.debug("Acquiring lock for materialization rebuild with {} for {}", + JavaUtils.txnIdToString(txnId), TableName.getDbTable(dbName, tableName)); } TxnStore.MutexAPI.LockHandle handle = null; @@ -2447,7 +2314,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { if (rc < 1) { LOG.debug("Going to rollback"); dbConn.rollback(); - LOG.info("No lock found for rebuild of " + Warehouse.getQualifiedName(dbName, tableName) + + LOG.info("No lock found for rebuild of " + TableName.getDbTable(dbName, tableName) + " when trying to heartbeat"); // It could not be renewed, return that information return false; @@ -2460,7 +2327,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); checkRetryable(dbConn, e, - "heartbeatLockMaterializationRebuild(" + Warehouse.getQualifiedName(dbName, tableName) + ", " + txnId + ")"); + "heartbeatLockMaterializationRebuild(" + TableName.getDbTable(dbName, tableName) + ", " + txnId + ")"); throw new MetaException("Unable to heartbeat rebuild lock due to " + StringUtils.stringifyException(e)); } finally { @@ -2573,9 +2440,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { * @throws SQLException * @throws MetaException */ - private TxnRecord lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException { + private TxnRecord lockTransactionRecord(Statement stmt, long txnId, TxnStatus txnState) + throws SQLException, MetaException { String query = "SELECT \"TXN_TYPE\" FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnId - + (txnState != null ? " AND \"TXN_STATE\" = " + quoteChar(txnState) : ""); + + (txnState != null ? " AND \"TXN_STATE\" = " + txnState : ""); try (ResultSet rs = stmt.executeQuery(sqlGenerator.addForUpdateClause(query))) { return rs.next() ? new TxnRecord(rs.getInt(1)) : null; } @@ -2596,9 +2464,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { * 1. We use S4U (withe read_committed) to generate the next (ext) lock id. This serializes * any 2 {@code enqueueLockWithRetry()} calls. * 2. We use S4U on the relevant TXNS row to block any concurrent abort/commit/etc operations - * @see #checkLockWithRetry(Connection, long, long) + * @see #checkLockWithRetry(Connection, long, long, boolean) */ - private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { + private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) + throws NoSuchTxnException, TxnAbortedException, MetaException { boolean success = false; Connection dbConn = null; try { @@ -2610,7 +2479,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { stmt = dbConn.createStatement(); if (isValidTxn(txnid)) { //this also ensures that txn is still there in expected state - TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TXN_OPEN); + TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TxnStatus.OPEN); if (txnRecord == null) { ensureValidTxn(dbConn, txnid, stmt); shouldNeverHappen(txnid); @@ -2704,7 +2573,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { pstmt.setString(2, dbName); pstmt.setString(3, tblName); pstmt.setString(4, partName); - pstmt.setString(5, OperationType.fromDataOperationType(lc.getOperationType()).toString()); + pstmt.setString(5, OperationType.fromDataOperationType(lc.getOperationType()).getSqlConst()); pstmt.setObject(6, writeId.orElse(null)); pstmt.addBatch(); @@ -2889,12 +2758,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { * The clients that operate in blocking mode, can't heartbeat a lock until the lock is acquired. * We should make CheckLockRequest include timestamp or last request to skip unnecessary heartbeats. Thrift change. * - * {@link #checkLock(java.sql.Connection, long, long)} must run at SERIALIZABLE (make sure some lock we are checking - * against doesn't move from W to A in another txn) but this method can heartbeat in - * separate txn at READ_COMMITTED. + * {@link #checkLock(java.sql.Connection, long, long, boolean)} must run at SERIALIZABLE + * (make sure some lock we are checking against doesn't move from W to A in another txn) + * but this method can heartbeat in separate txn at READ_COMMITTED. * * Retry-by-caller note: - * Retryable because {@link #checkLock(Connection, long, long)} is + * Retryable because {@link #checkLock(Connection, long, long, boolean)} is */ @Override @RetrySemantics.SafeToRetry @@ -3193,7 +3062,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } TxnUtils.buildQueryWithINClause(conf, queries, new StringBuilder("UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + TxnDbUtil.getEpochFn(dbProduct) + - " WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + " AND "), + " WHERE \"TXN_STATE\" = " + TxnStatus.OPEN + " AND "), new StringBuilder(""), txnIds, "\"TXN_ID\"", true, false); int updateCnt = 0; for (String query : queries) { @@ -3516,7 +3385,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - TxnRecord txnRecord = lockTransactionRecord(stmt, rqst.getTxnid(), TXN_OPEN); + TxnRecord txnRecord = lockTransactionRecord(stmt, rqst.getTxnid(), TxnStatus.OPEN); if (txnRecord == null) { //ensures txn is still there and in expected state ensureValidTxn(dbConn, rqst.getTxnid(), stmt); @@ -3536,7 +3405,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { pstmt.setString(2, normalizeCase(rqst.getDbname())); pstmt.setString(3, normalizeCase(rqst.getTablename())); pstmt.setString(4, partName); - pstmt.setString(5, Character.toString(ot.sqlConst)); + pstmt.setString(5, ot.getSqlConst()); pstmt.setObject(6, writeId); pstmt.addBatch(); @@ -4424,8 +4293,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { StringBuilder suffix = new StringBuilder(); // add update txns queries to query list - prefix.append("UPDATE \"TXNS\" SET \"TXN_STATE\" = ").append(quoteChar(TXN_ABORTED)) - .append(" WHERE \"TXN_STATE\" = ").append(quoteChar(TXN_OPEN)).append(" AND "); + prefix.append("UPDATE \"TXNS\" SET \"TXN_STATE\" = ").append(TxnStatus.ABORTED) + .append(" WHERE \"TXN_STATE\" = ").append(TxnStatus.OPEN).append(" AND "); if (checkHeartbeat) { suffix.append(" AND \"TXN_LAST_HEARTBEAT\" < ") .append(TxnDbUtil.getEpochFn(dbProduct)).append("-").append(timeout); @@ -4735,7 +4604,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } try (Statement stmt = dbConn.createStatement()) { String s = "UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + TxnDbUtil.getEpochFn(dbProduct) + - " WHERE \"TXN_ID\" = " + txnid + " AND \"TXN_STATE\" = '" + TXN_OPEN + "'"; + " WHERE \"TXN_ID\" = " + txnid + " AND \"TXN_STATE\" = " + TxnStatus.OPEN; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { @@ -4771,16 +4640,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { // could also check WRITE_SET but that seems overkill return TxnStatus.UNKNOWN; } - char txnState = rs.getString(1).charAt(0); - if (txnState == TXN_ABORTED) { - return TxnStatus.ABORTED; - } - if (txnState == TXN_COMMITTED) { - return TxnStatus.COMMITTED; - } - assert txnState == TXN_OPEN : "we found it in TXNS but it's not ABORTED, so must be OPEN"; + return TxnStatus.fromString(rs.getString(1)); } - return TxnStatus.OPEN; } /** @@ -4795,8 +4656,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { // Get the count of txns from the given list that are in open state and not read-only. // If the returned count is same as the input number of txns, then all txns are in open state and not read-only. - prefix.append("SELECT COUNT(*) FROM \"TXNS\" WHERE \"TXN_STATE\" = '" + TXN_OPEN - + "' AND \"TXN_TYPE\" != " + TxnType.READ_ONLY.getValue() + " AND "); + prefix.append("SELECT COUNT(*) FROM \"TXNS\" WHERE \"TXN_STATE\" = " + TxnStatus.OPEN + + " AND \"TXN_TYPE\" != " + TxnType.READ_ONLY.getValue() + " AND "); TxnUtils.buildQueryWithINClause(conf, queries, prefix, new StringBuilder(), txnIds, "\"TXN_ID\"", false, false); @@ -4833,10 +4694,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { try (ResultSet rs = stmt.executeQuery(query)) { while (rs.next()) { long txnId = rs.getLong(1); - char txnState = rs.getString(2).charAt(0); + TxnStatus txnState = TxnStatus.fromString(rs.getString(2)); TxnType txnType = TxnType.findByValue(rs.getInt(3)); - if (txnState != TXN_OPEN) { + if (txnState != TxnStatus.OPEN) { txnInfo.append("{").append(txnId).append(",").append(txnState).append("}"); } else if (txnType == TxnType.READ_ONLY) { txnInfo.append("{").append(txnId).append(",read-only}"); @@ -4920,7 +4781,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { throw new NoSuchTxnException("No such transaction " + JavaUtils.txnIdToString(txnid)); } } - if (rs.getString(1).charAt(0) == TXN_ABORTED) { + if (TxnStatus.fromString(rs.getString(1)) == TxnStatus.ABORTED) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); throw new TxnAbortedException("Transaction " + JavaUtils.txnIdToString(txnid) @@ -5047,8 +4908,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { timeOutLocks(dbConn); while(true) { stmt = dbConn.createStatement(); - String s = " \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = '" + TXN_OPEN + - "' AND \"TXN_LAST_HEARTBEAT\" < " + TxnDbUtil.getEpochFn(dbProduct) + "-" + timeout + + String s = " \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + TxnStatus.OPEN + + " AND \"TXN_LAST_HEARTBEAT\" < " + TxnDbUtil.getEpochFn(dbProduct) + "-" + timeout + " AND \"TXN_TYPE\" != " + TxnType.REPL_CREATED.getValue(); //safety valve for extreme cases s = sqlGenerator.addLimitClause(10 * TIMED_OUT_TXN_ABORT_BATCH_SIZE, s); @@ -5108,7 +4969,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "SELECT COUNT(*) FROM \"TXNS\" WHERE \"TXN_STATE\" = '" + TXN_OPEN + "'"; + String s = "SELECT COUNT(*) FROM \"TXNS\" WHERE \"TXN_STATE\" = " + TxnStatus.OPEN; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (!rs.next()) { @@ -5499,6 +5360,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { public boolean isWrapperFor(Class<?> iface) throws SQLException { throw new UnsupportedOperationException(); } - }; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStatus.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStatus.java new file mode 100644 index 0000000..a9ad560 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStatus.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import org.apache.hadoop.hive.metastore.api.TxnState; + +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; + +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; + +/** + * These are the valid values for TXNS.TXN_STATE. + */ +public enum TxnStatus { + OPEN('o', TxnState.OPEN), + ABORTED('a', TxnState.ABORTED), + COMMITTED('c', TxnState.COMMITTED), + UNKNOWN('u', null); + + private final char sqlConst; + private final TxnState txnState; + + private static final Map<String, TxnStatus> LOOKUP = + Arrays.stream(TxnStatus.values()).collect(toMap(TxnStatus::getSqlConst, identity())); + + TxnStatus(char sqlConst, TxnState txnState) { + + this.sqlConst = sqlConst; + this.txnState = txnState; + } + + public String toString() { + return "'" + getSqlConst() + "'"; + } + + public String getSqlConst() { + return Character.toString(sqlConst); + } + + public TxnState toTxnState() { + return Optional.of(txnState).orElseThrow(IllegalArgumentException::new); + } + + public static TxnStatus fromString(String sqlConst) { + return Optional.of(LOOKUP.get(sqlConst)).orElseThrow(IllegalArgumentException::new); + } +}