HDFS-11660. TestFsDatasetCache#testPageRounder fails intermittently with 
AssertionError.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/74a72385
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/74a72385
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/74a72385

Branch: refs/heads/YARN-5355
Commit: 74a723852d9ef265d0fd46334f2f885bfb247a3a
Parents: c154935
Author: Andrew Wang <w...@apache.org>
Authored: Wed Apr 19 18:10:04 2017 -0700
Committer: Andrew Wang <w...@apache.org>
Committed: Wed Apr 19 18:10:04 2017 -0700

----------------------------------------------------------------------
 .../hdfs/server/datanode/BPServiceActor.java    |  3 +
 .../server/datanode/DataNodeFaultInjector.java  |  6 +-
 .../server/datanode/TestDataNodeMetrics.java    |  7 ++-
 .../server/datanode/TestFsDatasetCache.java     | 62 ++++++++++++++++----
 .../shortcircuit/TestShortCircuitCache.java     |  8 ++-
 5 files changed, 67 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/74a72385/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 21e2a3b..c1cda57 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -623,6 +623,7 @@ class BPServiceActor implements Runnable {
     //
     while (shouldRun()) {
       try {
+        DataNodeFaultInjector.get().startOfferService();
         final long startTime = scheduler.monotonicNow();
 
         //
@@ -725,6 +726,8 @@ class BPServiceActor implements Runnable {
       } catch (IOException e) {
         LOG.warn("IOException in offerService", e);
         sleepAfterException();
+      } finally {
+        DataNodeFaultInjector.get().endOfferService();
       }
       processQueueMessages();
     } // while (shouldRun())

http://git-wip-us.apache.org/repos/asf/hadoop/blob/74a72385/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
index b74d2c9..d2d557f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
@@ -30,7 +30,7 @@ import java.io.IOException;
 @VisibleForTesting
 @InterfaceAudience.Private
 public class DataNodeFaultInjector {
-  public static DataNodeFaultInjector instance = new DataNodeFaultInjector();
+  private static DataNodeFaultInjector instance = new DataNodeFaultInjector();
 
   public static DataNodeFaultInjector get() {
     return instance;
@@ -81,4 +81,8 @@ public class DataNodeFaultInjector {
 
   public void failPipeline(ReplicaInPipeline replicaInfo,
       String mirrorAddr) throws IOException { }
+
+  public void startOfferService() throws Exception {}
+
+  public void endOfferService() throws Exception {}
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/74a72385/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
index a404259..9abc19d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
@@ -32,6 +32,7 @@ import java.util.List;
 import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
 
+import net.jcip.annotations.NotThreadSafe;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -58,6 +59,7 @@ import org.mockito.Mockito;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+@NotThreadSafe
 public class TestDataNodeMetrics {
   private static final Log LOG = LogFactory.getLog(TestDataNodeMetrics.class);
 
@@ -216,6 +218,7 @@ public class TestDataNodeMetrics {
         new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
 
     final List<FSDataOutputStream> streams = Lists.newArrayList();
+    DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
     try {
       final FSDataOutputStream out =
           cluster.getFileSystem().create(path, (short) 2);
@@ -224,7 +227,7 @@ public class TestDataNodeMetrics {
       Mockito.doThrow(new IOException("mock IOException")).
           when(injector).
           writeBlockAfterFlush();
-      DataNodeFaultInjector.instance = injector;
+      DataNodeFaultInjector.set(injector);
       streams.add(out);
       out.writeBytes("old gs data\n");
       out.hflush();
@@ -250,7 +253,7 @@ public class TestDataNodeMetrics {
       if (cluster != null) {
         cluster.shutdown();
       }
-      DataNodeFaultInjector.instance = new DataNodeFaultInjector();
+      DataNodeFaultInjector.set(oldInjector);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/74a72385/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
index 28bf13b..2dbd5b9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import net.jcip.annotations.NotThreadSafe;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
@@ -34,6 +35,8 @@ import java.nio.channels.FileChannel;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -80,8 +83,10 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.MetricsAsserts;
 import org.apache.log4j.Logger;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
@@ -91,6 +96,7 @@ import com.google.common.primitives.Ints;
 
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY;
 
+@NotThreadSafe
 public class TestFsDatasetCache {
   private static final Log LOG = LogFactory.getLog(TestFsDatasetCache.class);
 
@@ -110,13 +116,39 @@ public class TestFsDatasetCache {
   private static DataNode dn;
   private static FsDatasetSpi<?> fsd;
   private static DatanodeProtocolClientSideTranslatorPB spyNN;
+  /**
+   * Used to pause DN BPServiceActor threads. BPSA threads acquire the
+   * shared read lock. The test acquires the write lock for exclusive access.
+   */
+  private static ReadWriteLock lock = new ReentrantReadWriteLock(true);
   private static final PageRounder rounder = new PageRounder();
   private static CacheManipulator prevCacheManipulator;
+  private static DataNodeFaultInjector oldInjector;
 
   static {
     LogManager.getLogger(FsDatasetCache.class).setLevel(Level.DEBUG);
   }
 
+  @BeforeClass
+  public static void setUpClass() throws Exception {
+    oldInjector = DataNodeFaultInjector.get();
+    DataNodeFaultInjector.set(new DataNodeFaultInjector() {
+      @Override
+      public void startOfferService() throws Exception {
+        lock.readLock().lock();
+      }
+      @Override
+      public void endOfferService() throws Exception {
+        lock.readLock().unlock();
+      }
+    });
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws Exception {
+    DataNodeFaultInjector.set(oldInjector);
+  }
+
   @Before
   public void setUp() throws Exception {
     conf = new HdfsConfiguration();
@@ -143,7 +175,6 @@ public class TestFsDatasetCache {
     fsd = dn.getFSDataset();
 
     spyNN = InternalDataNodeTestUtils.spyOnBposToNN(dn, nn);
-
   }
 
   @After
@@ -164,18 +195,23 @@ public class TestFsDatasetCache {
   }
 
   private static void setHeartbeatResponse(DatanodeCommand[] cmds)
-      throws IOException {
-    NNHAStatusHeartbeat ha = new NNHAStatusHeartbeat(HAServiceState.ACTIVE,
-        fsImage.getLastAppliedOrWrittenTxId());
-    HeartbeatResponse response =
-        new HeartbeatResponse(cmds, ha, null,
-            ThreadLocalRandom.current().nextLong() | 1L);
-    doReturn(response).when(spyNN).sendHeartbeat(
-        (DatanodeRegistration) any(),
-        (StorageReport[]) any(), anyLong(), anyLong(),
-        anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
-        anyBoolean(), any(SlowPeerReports.class),
-        any(SlowDiskReports.class));
+      throws Exception {
+    lock.writeLock().lock();
+    try {
+      NNHAStatusHeartbeat ha = new NNHAStatusHeartbeat(HAServiceState.ACTIVE,
+          fsImage.getLastAppliedOrWrittenTxId());
+      HeartbeatResponse response =
+          new HeartbeatResponse(cmds, ha, null,
+              ThreadLocalRandom.current().nextLong() | 1L);
+      doReturn(response).when(spyNN).sendHeartbeat(
+          (DatanodeRegistration) any(),
+          (StorageReport[]) any(), anyLong(), anyLong(),
+          anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
+          anyBoolean(), any(SlowPeerReports.class),
+          any(SlowDiskReports.class));
+    } finally {
+      lock.writeLock().unlock();
+    }
   }
 
   private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/74a72385/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
index 06c6cf6..7ba0edc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
@@ -34,6 +34,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
 
+import net.jcip.annotations.NotThreadSafe;
 import org.apache.commons.collections.map.LinkedMap;
 import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.logging.Log;
@@ -81,6 +82,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.common.collect.HashMultimap;
 
+@NotThreadSafe
 public class TestShortCircuitCache {
   static final Log LOG = LogFactory.getLog(TestShortCircuitCache.class);
   
@@ -723,8 +725,8 @@ public class TestShortCircuitCache {
         throw new IOException("injected error into sendShmResponse");
       }
     }).when(failureInjector).sendShortCircuitShmResponse();
-    DataNodeFaultInjector prevInjector = DataNodeFaultInjector.instance;
-    DataNodeFaultInjector.instance = failureInjector;
+    DataNodeFaultInjector prevInjector = DataNodeFaultInjector.get();
+    DataNodeFaultInjector.set(failureInjector);
 
     try {
       // The first read will try to allocate a shared memory segment and slot.
@@ -741,7 +743,7 @@ public class TestShortCircuitCache {
         cluster.getDataNodes().get(0).getShortCircuitRegistry());
 
     LOG.info("Clearing failure injector and performing another read...");
-    DataNodeFaultInjector.instance = prevInjector;
+    DataNodeFaultInjector.set(prevInjector);
 
     fs.getClient().getClientContext().getDomainSocketFactory().clearPathMap();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to