HDFS-4176. EditLogTailer should call rollEdits with a timeout. (Lei Xu)

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/67406460
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/67406460
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/67406460

Branch: refs/heads/HADOOP-12756
Commit: 67406460f0b6c05edde1d1185aeb42b6324df202
Parents: 328c855
Author: Lei Xu <l...@apache.org>
Authored: Thu Jul 28 13:32:00 2016 -0700
Committer: Lei Xu <l...@apache.org>
Committed: Thu Jul 28 13:32:00 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  3 +
 .../hdfs/server/namenode/ha/EditLogTailer.java  | 76 +++++++++++++++-----
 .../src/main/resources/hdfs-default.xml         |  7 ++
 .../server/namenode/ha/TestEditLogTailer.java   | 46 ++++++++++++
 4 files changed, 116 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/67406460/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 231dea7..3385751 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -731,6 +731,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_HA_TAILEDITS_INPROGRESS_KEY =
           "dfs.ha.tail-edits.in-progress";
   public static final boolean DFS_HA_TAILEDITS_INPROGRESS_DEFAULT = false;
+  public static final String DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY =
+      "dfs.ha.tail-edits.rolledits.timeout";
+  public static final int DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_DEFAULT = 60; // 
1m
   public static final String DFS_HA_LOGROLL_RPC_TIMEOUT_KEY = 
"dfs.ha.log-roll.rpc.timeout";
   public static final int DFS_HA_LOGROLL_RPC_TIMEOUT_DEFAULT = 20000; // 20s
   public static final String DFS_HA_FENCE_METHODS_KEY = 
"dfs.ha.fencing.methods";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67406460/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
index 1447375..95c3d58 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
@@ -27,16 +27,21 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -102,6 +107,17 @@ public class EditLogTailer {
   private final long logRollPeriodMs;
 
   /**
+   * The timeout in milliseconds of calling rollEdits RPC to Active NN.
+   * @see HDFS-4176.
+   */
+  private final long rollEditsTimeoutMs;
+
+  /**
+   * The executor to run roll edit RPC call in a daemon thread.
+   */
+  private final ExecutorService rollEditsRpcExecutor;
+
+  /**
    * How often the Standby should check if there are new finalized segment(s)
    * available to be read from.
    */
@@ -159,6 +175,13 @@ public class EditLogTailer {
     sleepTimeMs = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY,
         DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_DEFAULT) * 1000;
 
+    rollEditsTimeoutMs = conf.getInt(
+        DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY,
+        DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_DEFAULT) * 1000;
+
+    rollEditsRpcExecutor = Executors.newSingleThreadExecutor(
+        new ThreadFactoryBuilder().setDaemon(true).build());
+
     maxRetries = 
conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY,
       DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT);
     if (maxRetries <= 0) {
@@ -186,6 +209,7 @@ public class EditLogTailer {
   }
   
   public void stop() throws IOException {
+    rollEditsRpcExecutor.shutdown();
     tailerThread.setShouldRun(false);
     tailerThread.interrupt();
     try {
@@ -205,7 +229,7 @@ public class EditLogTailer {
   public void setEditLog(FSEditLog editLog) {
     this.editLog = editLog;
   }
-  
+
   public void catchupDuringFailover() throws IOException {
     Preconditions.checkState(tailerThread == null ||
         !tailerThread.isAlive(),
@@ -300,30 +324,50 @@ public class EditLogTailer {
   }
 
   /**
+   * NameNodeProxy factory method.
+   * @return a Callable to roll logs on remote NameNode.
+   */
+  @VisibleForTesting
+  Callable<Void> getNameNodeProxy() {
+    return new MultipleNameNodeProxy<Void>() {
+      @Override
+      protected Void doWork() throws IOException {
+        cachedActiveProxy.rollEditLog();
+        return null;
+      }
+    };
+  }
+
+  /**
    * Trigger the active node to roll its logs.
    */
-  private void triggerActiveLogRoll() {
+  @VisibleForTesting
+  void triggerActiveLogRoll() {
     LOG.info("Triggering log roll on remote NameNode");
+    Future<Void> future = null;
     try {
-      new MultipleNameNodeProxy<Void>() {
-        @Override
-        protected Void doWork() throws IOException {
-          cachedActiveProxy.rollEditLog();
-          return null;
-        }
-      }.call();
+      future = rollEditsRpcExecutor.submit(getNameNodeProxy());
+      future.get(rollEditsTimeoutMs, TimeUnit.MILLISECONDS);
       lastRollTriggerTxId = lastLoadedTxnId;
-    } catch (IOException ioe) {
-      if (ioe instanceof RemoteException) {
-        ioe = ((RemoteException)ioe).unwrapRemoteException();
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof RemoteException) {
+        IOException ioe = ((RemoteException) cause).unwrapRemoteException();
         if (ioe instanceof StandbyException) {
           LOG.info("Skipping log roll. Remote node is not in Active state: " +
               ioe.getMessage().split("\n")[0]);
           return;
         }
       }
-
-      LOG.warn("Unable to trigger a roll of the active NN", ioe);
+      LOG.warn("Unable to trigger a roll of the active NN", e);
+    } catch (TimeoutException e) {
+      if (future != null) {
+        future.cancel(true);
+      }
+      LOG.warn(String.format(
+          "Unable to finish rolling edits in %d ms", rollEditsTimeoutMs));
+    } catch (InterruptedException e) {
+      LOG.warn("Unable to trigger a roll of the active NN", e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67406460/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index b82fa31..e6fde8c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1496,6 +1496,13 @@
 </property>
 
 <property>
+  <name>dfs.ha.tail-edits.rolledits.timeout</name>
+  <value>60</value>
+  <description>The timeout in seconds of calling rollEdits RPC on Active NN.
+  </description>
+</property>
+
+<property>
   <name>dfs.ha.automatic-failover.enabled</name>
   <value>false</value>
   <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67406460/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
index 3af201d..0d0873d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
@@ -17,15 +17,19 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.ha;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -50,6 +54,7 @@ import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.base.Supplier;
+import org.mockito.Mockito;
 
 @RunWith(Parameterized.class)
 public class TestEditLogTailer {
@@ -249,4 +254,45 @@ public class TestEditLogTailer {
       }
     }, 100, 10000);
   }
+
+  @Test(timeout=20000)
+  public void testRollEditTimeoutForActiveNN() throws IOException {
+    Configuration conf = getConf();
+    conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY, 5); // 5s
+    conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY, 100);
+
+    HAUtil.setAllowStandbyReads(conf, true);
+
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(MiniDFSNNTopology.simpleHATopology())
+        .numDataNodes(0)
+        .build();
+    cluster.waitActive();
+
+    cluster.transitionToActive(0);
+
+    try {
+      EditLogTailer tailer = Mockito.spy(
+          cluster.getNamesystem(1).getEditLogTailer());
+      AtomicInteger flag = new AtomicInteger(0);
+
+      // Return a slow roll edit process.
+      when(tailer.getNameNodeProxy()).thenReturn(
+          new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+              Thread.sleep(30000);  // sleep for 30 seconds.
+              assertTrue(Thread.currentThread().isInterrupted());
+              flag.addAndGet(1);
+              return null;
+            }
+          }
+      );
+      tailer.triggerActiveLogRoll();
+      assertEquals(0, flag.get());
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to