HDFS-10267. Extra "synchronized" on FsDatasetImpl#recoverAppend and 
FsDatasetImpl#recoverClose


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4bd7cbc2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4bd7cbc2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4bd7cbc2

Branch: refs/heads/HDFS-1312
Commit: 4bd7cbc29d142fc56324156333b9a8a7d7b68042
Parents: 3be1ab4
Author: Colin Patrick Mccabe <cmcc...@cloudera.com>
Authored: Wed Apr 6 12:36:54 2016 -0700
Committer: Colin Patrick Mccabe <cmcc...@cloudera.com>
Committed: Wed Apr 6 21:07:31 2016 -0700

----------------------------------------------------------------------
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |   4 +-
 .../hdfs/server/datanode/TestBlockRecovery.java | 234 ++++++++++++++-----
 2 files changed, 180 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bd7cbc2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 240345c..7e4e8eb 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1268,7 +1268,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
   }
 
   @Override  // FsDatasetSpi
-  public synchronized ReplicaHandler recoverAppend(
+  public ReplicaHandler recoverAppend(
       ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
     LOG.info("Recover failed append to " + b);
 
@@ -1301,7 +1301,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
   }
 
   @Override // FsDatasetSpi
-  public synchronized Replica recoverClose(ExtendedBlock b, long newGS,
+  public Replica recoverClose(ExtendedBlock b, long newGS,
       long expectedBlockLen) throws IOException {
     LOG.info("Recover failed close " + b);
     while (true) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4bd7cbc2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 751089f..42e80fa 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -44,8 +44,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.collect.Iterators;
 import org.apache.commons.logging.Log;
@@ -90,6 +92,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Time;
 import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Assert;
@@ -161,7 +164,7 @@ public class TestBlockRecovery {
   }
 
   private final long
-      TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS = 1000000000L;
+      TEST_STOP_WORKER_XCEIVER_STOP_TIMEOUT_MILLIS = 1000000000L;
 
   /**
    * Starts an instance of DataNode
@@ -175,11 +178,10 @@ public class TestBlockRecovery {
     conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0");
     conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
     conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
-    if (currentTestName.getMethodName().equals(
-        "testInitReplicaRecoveryDoesNotHogLock")) {
+    if (currentTestName.getMethodName().contains("DoesNotHoldLock")) {
       // This test requires a very long value for the xceiver stop timeout.
       conf.setLong(DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
-          TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS);
+          TEST_STOP_WORKER_XCEIVER_STOP_TIMEOUT_MILLIS);
     }
     conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
     FileSystem.setDefaultUri(conf,
@@ -816,96 +818,216 @@ public class TestBlockRecovery {
     }
   }
 
+  private static class TestStopWorkerSemaphore {
+    final Semaphore sem;
+
+    final AtomicBoolean gotInterruption = new AtomicBoolean(false);
+
+    TestStopWorkerSemaphore() {
+      this.sem = new Semaphore(0);
+    }
+
+    /**
+     * Attempt to acquire a sempahore within a given timeout.
+     *
+     * This is useful for unit tests where we need to ignore 
InterruptedException
+     * when attempting to take a semaphore, but still want to honor the overall
+     * test timeout.
+     *
+     * @param timeoutMs   The timeout in miliseconds.
+     */
+    private void uninterruptiblyAcquire(long timeoutMs) throws Exception {
+      long startTimeMs = Time.monotonicNow();
+      while (true) {
+        long remTime = startTimeMs + timeoutMs - Time.monotonicNow();
+        if (remTime < 0) {
+          throw new RuntimeException("Failed to acquire the semaphore within " 
+
+              timeoutMs + " milliseconds.");
+        }
+        try {
+          if (sem.tryAcquire(1, remTime, TimeUnit.MILLISECONDS)) {
+            return;
+          }
+        } catch (InterruptedException e) {
+          gotInterruption.set(true);
+        }
+      }
+    }
+  }
+
+  private interface TestStopWorkerRunnable {
+    /**
+     * Return the name of the operation that this runnable performs.
+     */
+    String opName();
+
+    /**
+     * Perform the operation.
+     */
+    void run(RecoveringBlock recoveringBlock) throws Exception;
+  }
+
+  @Test(timeout=90000)
+  public void testInitReplicaRecoveryDoesNotHoldLock() throws Exception {
+    testStopWorker(new TestStopWorkerRunnable() {
+      @Override
+      public String opName() {
+        return "initReplicaRecovery";
+      }
+
+      @Override
+      public void run(RecoveringBlock recoveringBlock) throws Exception {
+        try {
+          spyDN.initReplicaRecovery(recoveringBlock);
+        } catch (Exception e) {
+          if (!e.getMessage().contains("meta does not exist")) {
+            throw e;
+          }
+        }
+      }
+    });
+  }
+
+  @Test(timeout=90000)
+  public void testRecoverAppendDoesNotHoldLock() throws Exception {
+    testStopWorker(new TestStopWorkerRunnable() {
+      @Override
+      public String opName() {
+        return "recoverAppend";
+      }
+
+      @Override
+      public void run(RecoveringBlock recoveringBlock) throws Exception {
+        try {
+          ExtendedBlock extBlock = recoveringBlock.getBlock();
+          spyDN.getFSDataset().recoverAppend(extBlock,
+              extBlock.getGenerationStamp() + 1, extBlock.getNumBytes());
+        } catch (Exception e) {
+          if (!e.getMessage().contains(
+              "Corrupted replica ReplicaBeingWritten")) {
+            throw e;
+          }
+        }
+      }
+    });
+  }
+
+  @Test(timeout=90000)
+  public void testRecoverCloseDoesNotHoldLock() throws Exception {
+    testStopWorker(new TestStopWorkerRunnable() {
+      @Override
+      public String opName() {
+        return "recoverClose";
+      }
+
+      @Override
+      public void run(RecoveringBlock recoveringBlock) throws Exception {
+        try {
+          ExtendedBlock extBlock = recoveringBlock.getBlock();
+          spyDN.getFSDataset().recoverClose(extBlock,
+              extBlock.getGenerationStamp() + 1, extBlock.getNumBytes());
+        } catch (Exception e) {
+          if (!e.getMessage().contains(
+              "Corrupted replica ReplicaBeingWritten")) {
+            throw e;
+          }
+        }
+      }
+    });
+  }
+
   /**
-   * Test that initReplicaRecovery does not hold the lock for an unreasonable
-   * amount of time if a writer is taking a long time to stop.
+   * Test that an FsDatasetImpl operation does not hold the lock for an
+   * unreasonable amount of time if a writer is taking a long time to stop.
    */
-  @Test(timeout=60000)
-  public void testInitReplicaRecoveryDoesNotHogLock() throws Exception {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Running " + GenericTestUtils.getMethodName());
-    }
+  private void testStopWorker(final TestStopWorkerRunnable tswr)
+      throws Exception {
+    LOG.debug("Running " + currentTestName.getMethodName());
     // We need a long value for the data xceiver stop timeout.
     // Otherwise the timeout will trigger, and we will not have tested that
     // thread join was done locklessly.
     Assert.assertEquals(
-        TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS,
+        TEST_STOP_WORKER_XCEIVER_STOP_TIMEOUT_MILLIS,
         dn.getDnConf().getXceiverStopTimeout());
-    final Semaphore progressParent = new Semaphore(0);
-    final Semaphore terminateSlowWorker = new Semaphore(0);
-    final AtomicBoolean failure = new AtomicBoolean(false);
+    final TestStopWorkerSemaphore progressParent =
+      new TestStopWorkerSemaphore();
+    final TestStopWorkerSemaphore terminateSlowWriter =
+      new TestStopWorkerSemaphore();
+    final AtomicReference<String> failure =
+        new AtomicReference<String>(null);
     Collection<RecoveringBlock> recoveringBlocks =
         initRecoveringBlocks();
     final RecoveringBlock recoveringBlock =
         Iterators.get(recoveringBlocks.iterator(), 0);
     final ExtendedBlock block = recoveringBlock.getBlock();
-    Thread slowWorker = new Thread(new Runnable() {
+    Thread slowWriterThread = new Thread(new Runnable() {
       @Override
       public void run() {
         try {
           // Register this thread as the writer for the recoveringBlock.
-          LOG.debug("slowWorker creating rbw");
+          LOG.debug("slowWriter creating rbw");
           ReplicaHandler replicaHandler =
               spyDN.data.createRbw(StorageType.DISK, block, false);
           replicaHandler.close();
-          LOG.debug("slowWorker created rbw");
+          LOG.debug("slowWriter created rbw");
           // Tell the parent thread to start progressing.
-          progressParent.release();
-          while (true) {
-            try {
-              terminateSlowWorker.acquire();
-              break;
-            } catch (InterruptedException e) {
-              // Ignore interrupted exceptions so that the waitingWorker thread
-              // will have to wait for us.
-            }
-          }
-          LOG.debug("slowWorker exiting");
+          progressParent.sem.release();
+          terminateSlowWriter.uninterruptiblyAcquire(60000);
+          LOG.debug("slowWriter exiting");
         } catch (Throwable t) {
-          LOG.error("slowWorker got exception", t);
-          failure.set(true);
+          LOG.error("slowWriter got exception", t);
+          failure.compareAndSet(null, "slowWriter got exception " +
+              t.getMessage());
         }
       }
     });
     // Start the slow worker thread and wait for it to take ownership of the
     // ReplicaInPipeline
-    slowWorker.start();
-    while (true) {
-      try {
-        progressParent.acquire();
-        break;
-      } catch (InterruptedException e) {
-        // Ignore interrupted exceptions
-      }
-    }
+    slowWriterThread.start();
+    progressParent.uninterruptiblyAcquire(60000);
 
-    // Start a worker thread which will wait for the slow worker thread.
-    Thread waitingWorker = new Thread(new Runnable() {
+    // Start a worker thread which will attempt to stop the writer.
+    Thread stopWriterThread = new Thread(new Runnable() {
       @Override
       public void run() {
         try {
-          // Attempt to terminate the other worker thread and take ownership
-          // of the ReplicaInPipeline.
-          LOG.debug("waitingWorker initiating recovery");
-          spyDN.initReplicaRecovery(recoveringBlock);
-          LOG.debug("waitingWorker initiated recovery");
+          LOG.debug("initiating " + tswr.opName());
+          tswr.run(recoveringBlock);
+          LOG.debug("finished " + tswr.opName());
         } catch (Throwable t) {
-          GenericTestUtils.assertExceptionContains("meta does not exist", t);
+          LOG.error("stopWriterThread got unexpected exception for " +
+              tswr.opName(), t);
+          failure.compareAndSet(null, "stopWriterThread got unexpected " +
+              "exception for " + tswr.opName() + ": " + t.getMessage());
         }
       }
     });
-    waitingWorker.start();
+    stopWriterThread.start();
 
-    // Do an operation that requires the lock.  This should not be blocked
-    // by the replica recovery in progress.
+    while (!terminateSlowWriter.gotInterruption.get()) {
+      // Wait until stopWriterThread attempts to stop our slow writer by 
sending
+      // it an InterruptedException.
+      Thread.sleep(1);
+    }
+
+    // We know that stopWriterThread is in the process of joining our slow
+    // writer.  It must not hold the lock during this operation.
+    // In order to test that it does not, we attempt to do an operation that
+    // requires the lock-- getReplicaString.
     spyDN.getFSDataset().getReplicaString(
         recoveringBlock.getBlock().getBlockPoolId(),
         recoveringBlock.getBlock().getBlockId());
 
-    // Wait for the two worker threads to exit normally.
-    terminateSlowWorker.release();
-    slowWorker.join();
-    waitingWorker.join();
-    Assert.assertFalse("The slowWriter thread failed.", failure.get());
+    // Tell the slow writer to exit, and then wait for all threads to join.
+    terminateSlowWriter.sem.release();
+    slowWriterThread.join();
+    stopWriterThread.join();
+
+    // Check that our worker threads exited cleanly.  This is not checked by 
the
+    // unit test framework, so we have to do it manually here.
+    String failureReason = failure.get();
+    if (failureReason != null) {
+      Assert.fail("Thread failure: " + failureReason);
+    }
   }
 }

Reply via email to