Repository: hive
Updated Branches:
  refs/heads/master aae392ff8 -> d8c500b89


HIVE-20942 Worker should heartbeat its own txn (Igor Kryvenko via Eugene 
Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d8c500b8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d8c500b8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d8c500b8

Branch: refs/heads/master
Commit: d8c500b896daf4c63b08bb4788fbc5ee374292b8
Parents: aae392f
Author: Igor Kryvenko <kryvenko7i...@gmail.com>
Authored: Fri Dec 7 15:04:49 2018 -0800
Committer: Eugene Koifman <ekoif...@apache.org>
Committed: Fri Dec 7 15:04:49 2018 -0800

----------------------------------------------------------------------
 .../hadoop/hive/ql/txn/compactor/Worker.java    | 62 +++++++++++++++++---
 1 file changed, 55 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d8c500b8/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 939f8c7..4a1cac1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -29,7 +29,9 @@ import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.mapred.JobConf;
 import org.slf4j.Logger;
@@ -53,6 +55,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -89,6 +92,7 @@ public class Worker extends CompactorThread {
       boolean launchedJob = false;
       // Make sure nothing escapes this run method and kills the metastore at 
large,
       // so wrap it in a big catch Throwable statement.
+      CompactionHeartbeater heartbeater = null;
       try {
         final CompactionInfo ci = txnHandler.findNextToCompact(workerName);
         LOG.debug("Processing compaction request " + ci);
@@ -176,13 +180,8 @@ public class Worker extends CompactorThread {
         //todo: now we can update compaction_queue entry with this id
         //also make sure to write to TXN_COMPONENTS so that if txn aborts, we 
don't delete the metadata about it from TXNS!!!!
 
-        HeartbeatRequest heartbeatRequest = new HeartbeatRequest();
-        heartbeatRequest.setTxnid(compactorTxnId);
-        heartbeatRequest.setLockid(0);
-        /**
-         * todo: now set up a thread to do the heartbeat
-         */
-        txnHandler.heartbeat(heartbeatRequest);
+        heartbeater = new CompactionHeartbeater(txnHandler, compactorTxnId, 
fullTableName, conf);
+        heartbeater.start();
 
         ValidTxnList validTxnList = 
TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), compactorTxnId);
         GetValidWriteIdsRequest rqst = new 
GetValidWriteIdsRequest(Collections.singletonList(fullTableName));
@@ -225,6 +224,7 @@ public class Worker extends CompactorThread {
                   ci.getFullPartitionName(), exception);
             }
           }
+          heartbeater.cancel();
           txnHandler.markCompacted(ci);
           txnHandler.commitTxn(new CommitTxnRequest(compactorTxnId));
           if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
@@ -239,6 +239,10 @@ public class Worker extends CompactorThread {
       } catch (Throwable t) {
         LOG.error("Caught an exception in the main loop of compactor worker " 
+ workerName + ", " +
             StringUtils.stringifyException(t));
+      } finally {
+        if(heartbeater != null) {
+          heartbeater.cancel();
+        }
       }
 
       // If we didn't try to launch a job it either means there was no work to 
do or we got
@@ -365,4 +369,48 @@ public class Worker extends CompactorThread {
       }
     }
   }
+
+  static final class CompactionHeartbeater extends Thread {
+    static final private Logger LOG = 
LoggerFactory.getLogger(CompactionHeartbeater.class);
+    private final TxnStore txnHandler;
+    private final AtomicBoolean stop = new AtomicBoolean();
+    private final long compactorTxnId;
+    private final String tableName;
+    private final HiveConf conf;
+    private final long interval;
+    public CompactionHeartbeater(TxnStore txnHandler, long compactorTxnId, 
String tableName, HiveConf conf) {
+      this.txnHandler = txnHandler;
+      this.tableName = tableName;
+      this.compactorTxnId = compactorTxnId;
+      this.conf = conf;
+      this.interval =
+          MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, 
TimeUnit.MILLISECONDS) / 2;
+      setDaemon(true);
+      setPriority(MIN_PRIORITY);
+      setName("CompactionHeartbeater-" + compactorTxnId);
+    }
+    @Override
+    public void run() {
+      try {
+        LOG.debug("Heartbeating compaction transaction id {} for table: {}", 
compactorTxnId, tableName);
+        HeartbeatRequest heartbeatRequest = new HeartbeatRequest();
+
+        heartbeatRequest.setTxnid(compactorTxnId);
+        heartbeatRequest.setLockid(0);
+        while(!stop.get()) {
+          txnHandler.heartbeat(heartbeatRequest);
+          Thread.sleep(interval);
+        }
+      } catch (Exception e) {
+        LOG.error("Error while heartbeating txn {} in {}, error: ", 
compactorTxnId, Thread.currentThread().getName(), e.getMessage());
+      }
+    }
+
+    public void cancel() {
+      if(!this.stop.get()) {
+        LOG.debug("Successfully stop the heartbeating the transaction {}", 
this.compactorTxnId);
+        this.stop.set(true);
+      }
+    }
+  }
 }

Reply via email to