Repository: hive Updated Branches: refs/heads/master aae392ff8 -> d8c500b89
HIVE-20942 Worker should heartbeat its own txn (Igor Kryvenko via Eugene Koifman) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d8c500b8 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d8c500b8 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d8c500b8 Branch: refs/heads/master Commit: d8c500b896daf4c63b08bb4788fbc5ee374292b8 Parents: aae392f Author: Igor Kryvenko <kryvenko7i...@gmail.com> Authored: Fri Dec 7 15:04:49 2018 -0800 Committer: Eugene Koifman <ekoif...@apache.org> Committed: Fri Dec 7 15:04:49 2018 -0800 ---------------------------------------------------------------------- .../hadoop/hive/ql/txn/compactor/Worker.java | 62 +++++++++++++++++--- 1 file changed, 55 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d8c500b8/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 939f8c7..4a1cac1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -29,7 +29,9 @@ import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; +import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.mapred.JobConf; import org.slf4j.Logger; @@ -53,6 +55,7 @@ import java.security.PrivilegedExceptionAction; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -89,6 +92,7 @@ public class Worker extends CompactorThread { boolean launchedJob = false; // Make sure nothing escapes this run method and kills the metastore at large, // so wrap it in a big catch Throwable statement. + CompactionHeartbeater heartbeater = null; try { final CompactionInfo ci = txnHandler.findNextToCompact(workerName); LOG.debug("Processing compaction request " + ci); @@ -176,13 +180,8 @@ public class Worker extends CompactorThread { //todo: now we can update compaction_queue entry with this id //also make sure to write to TXN_COMPONENTS so that if txn aborts, we don't delete the metadata about it from TXNS!!!! - HeartbeatRequest heartbeatRequest = new HeartbeatRequest(); - heartbeatRequest.setTxnid(compactorTxnId); - heartbeatRequest.setLockid(0); - /** - * todo: now set up a thread to do the heartbeat - */ - txnHandler.heartbeat(heartbeatRequest); + heartbeater = new CompactionHeartbeater(txnHandler, compactorTxnId, fullTableName, conf); + heartbeater.start(); ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), compactorTxnId); GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName)); @@ -225,6 +224,7 @@ public class Worker extends CompactorThread { ci.getFullPartitionName(), exception); } } + heartbeater.cancel(); txnHandler.markCompacted(ci); txnHandler.commitTxn(new CommitTxnRequest(compactorTxnId)); if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { @@ -239,6 +239,10 @@ public class Worker extends CompactorThread { } catch (Throwable t) { LOG.error("Caught an exception in the main loop of compactor worker " + workerName + ", " + StringUtils.stringifyException(t)); + } finally { + if(heartbeater != null) { + heartbeater.cancel(); + } } // If we didn't try to launch a job it either means there was no work to do or we got @@ -365,4 +369,48 @@ public class Worker extends CompactorThread { } } } + + static final class CompactionHeartbeater extends Thread { + static final private Logger LOG = LoggerFactory.getLogger(CompactionHeartbeater.class); + private final TxnStore txnHandler; + private final AtomicBoolean stop = new AtomicBoolean(); + private final long compactorTxnId; + private final String tableName; + private final HiveConf conf; + private final long interval; + public CompactionHeartbeater(TxnStore txnHandler, long compactorTxnId, String tableName, HiveConf conf) { + this.txnHandler = txnHandler; + this.tableName = tableName; + this.compactorTxnId = compactorTxnId; + this.conf = conf; + this.interval = + MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2; + setDaemon(true); + setPriority(MIN_PRIORITY); + setName("CompactionHeartbeater-" + compactorTxnId); + } + @Override + public void run() { + try { + LOG.debug("Heartbeating compaction transaction id {} for table: {}", compactorTxnId, tableName); + HeartbeatRequest heartbeatRequest = new HeartbeatRequest(); + + heartbeatRequest.setTxnid(compactorTxnId); + heartbeatRequest.setLockid(0); + while(!stop.get()) { + txnHandler.heartbeat(heartbeatRequest); + Thread.sleep(interval); + } + } catch (Exception e) { + LOG.error("Error while heartbeating txn {} in {}, error: ", compactorTxnId, Thread.currentThread().getName(), e.getMessage()); + } + } + + public void cancel() { + if(!this.stop.get()) { + LOG.debug("Successfully stop the heartbeating the transaction {}", this.compactorTxnId); + this.stop.set(true); + } + } + } }