This is an automated email from the ASF dual-hosted git repository.

kihwal pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 232e9f8  HDFS-15147. LazyPersistTestCase wait logic is error-prone. 
Contributed by Ahmed Hussein.
232e9f8 is described below

commit 232e9f8ee117a29a2f8f1360bebd8d0e8def826a
Author: Kihwal Lee <kih...@apache.org>
AuthorDate: Thu Feb 27 09:58:44 2020 -0600

    HDFS-15147. LazyPersistTestCase wait logic is error-prone. Contributed
    by Ahmed Hussein.
---
 .../hdfs/server/blockmanagement/BlockManager.java  |  11 +-
 .../hadoop/hdfs/server/namenode/FSNamesystem.java  |  18 +-
 .../java/org/apache/hadoop/hdfs/DFSTestUtil.java   |   2 +-
 .../fsdataset/impl/LazyPersistTestCase.java        | 234 ++++++++++++++++++---
 .../fsdataset/impl/TestLazyPersistFiles.java       |  77 +++----
 .../impl/TestLazyPersistReplicaPlacement.java      |   2 +-
 .../datanode/fsdataset/impl/TestLazyWriter.java    |   6 +-
 7 files changed, 260 insertions(+), 90 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index fd8739e..5addf5a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -46,9 +46,7 @@ import java.util.concurrent.FutureTask;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-
 import javax.management.ObjectName;
-
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -233,6 +231,8 @@ public class BlockManager implements BlockStatsMXBean {
 
   /** Replication thread. */
   final Daemon replicationThread = new Daemon(new ReplicationMonitor());
+  /** Timestamp for the last cycle of the redundancy thread. */
+  private final AtomicLong lastReplicationCycleTS = new AtomicLong(-1);
   
   /** Block report thread for handling async reports. */
   private final BlockReportProcessingThread blockReportThread =
@@ -3986,11 +3986,15 @@ public class BlockManager implements BlockStatsMXBean {
     return neededReplications.size();
   }
 
+  @VisibleForTesting
+  public long getLastReplicationCycleTS() {
+    return lastReplicationCycleTS.get();
+  }
+
   /**
    * Periodically calls computeReplicationWork().
    */
   private class ReplicationMonitor implements Runnable {
-
     @Override
     public void run() {
       while (namesystem.isRunning()) {
@@ -4000,6 +4004,7 @@ public class BlockManager implements BlockStatsMXBean {
             computeDatanodeWork();
             processPendingReplications();
             rescanPostponedMisreplicatedBlocks();
+            lastReplicationCycleTS.set(Time.monotonicNow());
           }
           Thread.sleep(replicationRecheckInterval);
         } catch (Throwable t) {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index f0af5b4..11ac3fc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -87,9 +87,12 @@ import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROU
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
+
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_KEY;
-import static 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER;
+
+import java.util.concurrent.atomic.AtomicLong;
+
 import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
 import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.ACTIVE;
 import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.OBSERVER;
@@ -294,6 +297,7 @@ import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.log4j.Appender;
 import org.apache.log4j.AsyncAppender;
@@ -450,6 +454,8 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
   // A daemon to periodically clean up corrupt lazyPersist files
   // from the name space.
   Daemon lazyPersistFileScrubber = null;
+  private final AtomicLong lazyPersistFileScrubberTS = new AtomicLong(0);
+
 
   // Executor to warm up EDEK cache
   private ExecutorService edekCacheLoader = null;
@@ -603,6 +609,12 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     return leaseManager;
   }
 
+  @VisibleForTesting
+  public long getLazyPersistFileScrubberTS() {
+    return lazyPersistFileScrubber == null ? -1
+        : lazyPersistFileScrubberTS.get();
+  }
+
   public boolean isHaEnabled() {
     return haEnabled;
   }
@@ -3943,6 +3955,8 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
         try {
           if (!isInSafeMode()) {
             clearCorruptLazyPersistFiles();
+            // set the timeStamp of last Cycle.
+            lazyPersistFileScrubberTS.set(Time.monotonicNow());
           } else {
             if (FSNamesystem.LOG.isDebugEnabled()) {
               FSNamesystem.LOG
@@ -3953,7 +3967,6 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
           FSNamesystem.LOG.error(
               "Ignoring exception in LazyPersistFileScrubber:", e);
         }
-
         try {
           Thread.sleep(scrubIntervalSec * 1000);
         } catch (InterruptedException e) {
@@ -7183,4 +7196,3 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
   }
 
 }
-
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index f81e90e3..bee3314 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1909,7 +1909,7 @@ public class DFSTestUtil {
           throw new UnhandledException("Test failed due to unexpected 
exception", e);
         }
       }
-    }, 1000, 60000);
+    }, 50, 60000);
   }
 
   public static StorageReceivedDeletedBlocks[] makeReportForReceivedBlock(
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
index 13ea940..ef34222 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 import com.google.common.base.Supplier;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 
+
 import static org.apache.hadoop.fs.CreateFlag.CREATE;
 import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
 import static org.apache.hadoop.fs.StorageType.DEFAULT;
@@ -36,15 +37,27 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeoutException;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -81,16 +94,28 @@ public abstract class LazyPersistTestCase {
     GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, Level.DEBUG);
   }
 
+  protected static final Logger LOG =
+      LoggerFactory.getLogger(LazyPersistTestCase.class);
   protected static final int BLOCK_SIZE = 5 * 1024 * 1024;
   protected static final int BUFFER_LENGTH = 4096;
-  private static final long HEARTBEAT_INTERVAL_SEC = 1;
-  private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
-  private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
-  private static final String JMX_SERVICE_NAME = "DataNode";
   protected static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
   protected static final int LAZY_WRITER_INTERVAL_SEC = 1;
-  protected static final Log LOG = 
LogFactory.getLog(LazyPersistTestCase.class);
   protected static final short REPL_FACTOR = 1;
+  private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
+  private static final String JMX_SERVICE_NAME = "DataNode";
+  private static final long HEARTBEAT_INTERVAL_SEC = 1;
+  private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
+  private static final int WAIT_FOR_FBR_MS = 10 * 1000;
+  private static final int WAIT_FOR_STORAGE_TYPES_MS = 30 * 1000;
+  private static final int WAIT_FOR_ASYNC_DELETE_MS = 10 * 1000;
+  private static final int WAIT_FOR_DN_SHUTDOWN_MS = 30 * 1000;
+  private static final int WAIT_FOR_REDUNDANCY_MS =
+      2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000;
+  private static final int WAIT_FOR_LAZY_SCRUBBER_MS =
+      2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000;
+  private static final int WAIT_POLL_INTERVAL_MS = 10;
+  private static final int WAIT_POLL_INTERVAL_LARGE_MS = 20;
+
   protected final long osPageSize =
       NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
 
@@ -154,7 +179,7 @@ public abstract class LazyPersistTestCase {
           return false;
         }
       }
-    }, 100, 30 * 1000);
+    }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_STORAGE_TYPES_MS);
 
     return client.getLocatedBlocks(path.toString(), 0, fileLength);
   }
@@ -429,11 +454,38 @@ public abstract class LazyPersistTestCase {
     private boolean disableScrubber=false;
   }
 
+  /**
+   * Force a FBR on all the datanodes.
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
   protected final void triggerBlockReport()
-      throws IOException, InterruptedException {
+      throws IOException, InterruptedException, TimeoutException {
     // Trigger block report to NN
-    DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
-    Thread.sleep(10 * 1000);
+    final Map<DatanodeStorageInfo, Integer> reportCounts = new HashMap<>();
+    final FSNamesystem fsn = cluster.getNamesystem();
+    for (DataNode dn : cluster.getDataNodes()) {
+      final DatanodeDescriptor dnd =
+          NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
+      final DatanodeStorageInfo storage = dnd.getStorageInfos()[0];
+      reportCounts.put(storage, storage.getBlockReportCount());
+      DataNodeTestUtils.triggerBlockReport(dn);
+    }
+    // wait for block reports to be received.
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        for (Entry<DatanodeStorageInfo, Integer> repCntEntry : reportCounts
+            .entrySet()) {
+          if (repCntEntry.getValue() == repCntEntry.getKey()
+              .getBlockReportCount()) {
+            return false;
+          }
+        }
+        return true;
+      }
+    }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_FBR_MS);
   }
 
   protected final boolean verifyBlockDeletedFromDir(File dir,
@@ -445,51 +497,65 @@ public abstract class LazyPersistTestCase {
 
       File blockFile = new File(targetDir, lb.getBlock().getBlockName());
       if (blockFile.exists()) {
-        LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
-          " exists after deletion.");
         return false;
       }
       File metaFile = new File(targetDir,
         DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
           lb.getBlock().getGenerationStamp()));
       if (metaFile.exists()) {
-        LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
-          " exists after deletion.");
         return false;
       }
     }
     return true;
   }
 
-  protected final boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
-      throws IOException, InterruptedException {
+  protected final boolean verifyDeletedBlocks(final LocatedBlocks 
locatedBlocks)
+      throws Exception {
 
     LOG.info("Verifying replica has no saved copy after deletion.");
     triggerBlockReport();
+    final DataNode dn = cluster.getDataNodes().get(0);
 
-    while(
-        cluster.getFsDatasetTestUtils(0).getPendingAsyncDeletions()
-        > 0L){
-      Thread.sleep(1000);
-    }
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        for (DataNode dn1 : cluster.getDataNodes()) {
+          if (cluster.getFsDatasetTestUtils(dn1).getPendingAsyncDeletions()
+              > 0) {
+            return false;
+          }
+        }
+        return true;
+      }
+    }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_ASYNC_DELETE_MS);
 
     final String bpid = cluster.getNamesystem().getBlockPoolId();
-    final FsDatasetSpi<?> dataset =
-        cluster.getDataNodes().get(0).getFSDataset();
-
+    final FsDatasetSpi<?> dataset = dn.getFSDataset();
     // Make sure deleted replica does not have a copy on either finalized dir 
of
-    // transient volume or finalized dir of non-transient volume
+    // transient volume or finalized dir of non-transient volume.
+    // We need to wait until the asyn deletion is scheduled.
     try (FsDatasetSpi.FsVolumeReferences volumes =
         dataset.getFsVolumeReferences()) {
-      for (FsVolumeSpi vol : volumes) {
-        FsVolumeImpl volume = (FsVolumeImpl) vol;
-        File targetDir = (volume.isTransientStorage()) ?
-            volume.getBlockPoolSlice(bpid).getFinalizedDir() :
-            volume.getBlockPoolSlice(bpid).getLazypersistDir();
-        if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
-          return false;
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          try {
+            for (FsVolumeSpi vol : volumes) {
+              FsVolumeImpl volume = (FsVolumeImpl) vol;
+              File targetDir = (volume.isTransientStorage()) ?
+                  volume.getBlockPoolSlice(bpid).getFinalizedDir() :
+                  volume.getBlockPoolSlice(bpid).getLazypersistDir();
+              if (!LazyPersistTestCase.this
+                  .verifyBlockDeletedFromDir(targetDir, locatedBlocks)) {
+                return false;
+              }
+            }
+            return true;
+          } catch (IOException ie) {
+            return false;
+          }
         }
-      }
+      }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_ASYNC_DELETE_MS);
     }
     return true;
   }
@@ -534,4 +600,104 @@ public abstract class LazyPersistTestCase {
     FsDatasetImpl fsDataset = (FsDatasetImpl) dn.getFSDataset();
     fsDataset.evictLazyPersistBlocks(Long.MAX_VALUE); // Run one eviction 
cycle.
   }
+
+  /**
+   * the DataNodes and sleep for the time it takes the NN to detect the DN as
+   * being dead.
+   */
+  protected void shutdownDataNodes()
+      throws TimeoutException, InterruptedException {
+    cluster.shutdownDataNodes();
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        try {
+          DatanodeInfo[] info = client.datanodeReport(
+              HdfsConstants.DatanodeReportType.LIVE);
+          return info.length == 0;
+        } catch (IOException e) {
+          return false;
+        }
+      }
+    }, WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_DN_SHUTDOWN_MS);
+  }
+
+  protected void waitForCorruptBlock(final long corruptCnt)
+      throws TimeoutException, InterruptedException {
+    // wait for the redundancy monitor to mark the file as corrupt.
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        Iterator<BlockInfo> bInfoIter = cluster.getNameNode()
+            
.getNamesystem().getBlockManager().getCorruptReplicaBlockIterator();
+        int count = 0;
+        while (bInfoIter.hasNext()) {
+          bInfoIter.next();
+          count++;
+        }
+        return corruptCnt == count;
+      }
+    }, 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS);
+  }
+
+  protected void waitForScrubberCycle()
+      throws TimeoutException, InterruptedException {
+    // wait for the redundancy monitor to mark the file as corrupt.
+    final FSNamesystem fsn = cluster.getNamesystem();
+    final long lastTimeStamp = fsn.getLazyPersistFileScrubberTS();
+    if (lastTimeStamp == -1) { // scrubber is disabled
+      return;
+    }
+    GenericTestUtils.waitFor(
+        new Supplier<Boolean>() {
+          @Override
+          public Boolean get() {
+            return lastTimeStamp != fsn.getLazyPersistFileScrubberTS();
+          }
+        }, 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_LAZY_SCRUBBER_MS);
+  }
+
+  protected void waitForRedundancyMonitorCycle()
+      throws TimeoutException, InterruptedException {
+    // wait for the redundancy monitor to mark the file as corrupt.
+    final BlockManager bm = cluster.getNamesystem().getBlockManager();
+    final long lastRedundancyTS =
+        bm.getLastReplicationCycleTS();
+
+    GenericTestUtils.waitFor(
+        new Supplier<Boolean>() {
+          @Override
+          public Boolean get() {
+            return lastRedundancyTS != bm.getLastReplicationCycleTS();
+          }
+        },
+        2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS);
+  }
+
+  protected void waitForRedundancyCount(final long cnt)
+      throws TimeoutException, InterruptedException {
+    final BlockManager bm = cluster.getNamesystem().getBlockManager();
+
+    GenericTestUtils.waitFor(
+        new Supplier<Boolean>() {
+          @Override
+          public Boolean get() {
+            return cnt == bm.getUnderReplicatedBlocksCount();
+          }
+        }, 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS);
+  }
+
+  protected void waitForFile(final Path path, final boolean expected)
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        try {
+          return expected == fs.exists(path);
+        } catch (IOException e) {
+          return false;
+        }
+      }
+    }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_STORAGE_TYPES_MS);
+  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index 8c43592..b177a7b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -16,8 +16,6 @@
  * limitations under the License.
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
-import com.google.common.collect.Iterators;
-import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -33,7 +31,6 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.hadoop.fs.StorageType.RAM_DISK;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
@@ -98,28 +95,20 @@ public class TestLazyPersistFiles extends 
LazyPersistTestCase {
     makeTestFile(path1, BLOCK_SIZE, true);
     ensureFileReplicasOnStorageType(path1, RAM_DISK);
 
-    // Stop the DataNode and sleep for the time it takes the NN to
-    // detect the DN as being dead.
-    cluster.shutdownDataNodes();
-    Thread.sleep(30000L);
+    // Stop the DataNode.
+    shutdownDataNodes();
     assertThat(cluster.getNamesystem().getNumDeadDataNodes(), is(1));
 
-    // Next, wait for the replication monitor to mark the file as corrupt
-    Thread.sleep(2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000);
-
+    // Next, wait for the redundancy monitor to mark the file as corrupt.
+    waitForRedundancyMonitorCycle();
     // Wait for the LazyPersistFileScrubber to run
-    Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000);
-
+    waitForScrubberCycle();
     // Ensure that path1 does not exist anymore, whereas path2 does.
-    assert(!fs.exists(path1));
+    waitForFile(path1, false);
 
     // We should have zero blocks that needs replication i.e. the one
-    // belonging to path2.
-    assertThat(cluster.getNameNode()
-                      .getNamesystem()
-                      .getBlockManager()
-                      .getUnderReplicatedBlocksCount(),
-               is(0L));
+    // belonging to path2. This needs a wait.
+    waitForRedundancyCount(0L);
   }
 
   @Test
@@ -134,18 +123,14 @@ public class TestLazyPersistFiles extends 
LazyPersistTestCase {
 
     // Stop the DataNode and sleep for the time it takes the NN to
     // detect the DN as being dead.
-    cluster.shutdownDataNodes();
-    Thread.sleep(30000L);
-
-    // Next, wait for the replication monitor to mark the file as corrupt
-    Thread.sleep(2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000);
+    shutdownDataNodes();
 
+    // wait for the redundancy monitor to mark the file as corrupt.
+    waitForCorruptBlock(1L);
     // Wait for the LazyPersistFileScrubber to run
-    Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000);
-
+    waitForScrubberCycle();
     // Ensure that path1 exist.
-    Assert.assertTrue(fs.exists(path1));
-
+    waitForFile(path1, true);
   }
 
  /**
@@ -160,21 +145,14 @@ public class TestLazyPersistFiles extends 
LazyPersistTestCase {
     makeTestFile(path1, BLOCK_SIZE, true);
     ensureFileReplicasOnStorageType(path1, RAM_DISK);
 
-    cluster.shutdownDataNodes();
+    shutdownDataNodes();
 
     cluster.restartNameNodes();
 
-    // wait for the replication monitor to mark the file as corrupt
-    Thread.sleep(2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000);
-
-    Long corruptBlkCount = (long) Iterators.size(cluster.getNameNode()
-        .getNamesystem().getBlockManager().getCorruptReplicaBlockIterator());
-
-    // Check block detected as corrupted
-    assertThat(corruptBlkCount, is(1L));
-
+    // wait for the redundancy monitor to mark the file as corrupt.
+    waitForCorruptBlock(1L);
     // Ensure path1 exist.
-    Assert.assertTrue(fs.exists(path1));
+    waitForFile(path1, true);
   }
 
   /**
@@ -216,10 +194,19 @@ public class TestLazyPersistFiles extends 
LazyPersistTestCase {
       threads[i].start();
     }
 
-    Thread.sleep(500);
-
     for (int i = 0; i < NUM_TASKS; i++) {
-      Uninterruptibles.joinUninterruptibly(threads[i]);
+      boolean interrupted = false;
+      while (true) {
+        try {
+          threads[i].join();
+          break;
+        } catch (InterruptedException e) {
+          interrupted = true;
+        }
+      }
+      if (interrupted) {
+        Thread.currentThread().interrupt();
+      }
     }
     Assert.assertFalse(testFailed.get());
   }
@@ -233,7 +220,7 @@ public class TestLazyPersistFiles extends 
LazyPersistTestCase {
    */
   @Test
   public void testConcurrentWrites()
-    throws IOException, InterruptedException {
+      throws IOException, InterruptedException, TimeoutException {
     getClusterBuilder().setRamDiskReplicaCapacity(9).build();
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     final int SEED = 0xFADED;
@@ -282,11 +269,11 @@ public class TestLazyPersistFiles extends 
LazyPersistTestCase {
       this.seed = seed;
       this.latch = latch;
       this.bFail = bFail;
-      System.out.println("Creating Writer: " + id);
+      LOG.info("Creating Writer: " + id);
     }
 
     public void run() {
-      System.out.println("Writer " + id + " starting... ");
+      LOG.info("Writer " + id + " starting... ");
       int i = 0;
       try {
         for (i = 0; i < paths.length; i++) {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
index c16dbe5..b6413ec 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
@@ -119,7 +119,7 @@ public class TestLazyPersistReplicaPlacement extends 
LazyPersistTestCase {
    */
   @Test
   public void testFallbackToDiskPartial()
-      throws IOException, InterruptedException {
+      throws IOException, InterruptedException, TimeoutException {
     getClusterBuilder().setMaxLockedMemory(2 * BLOCK_SIZE).build();
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path = new Path("/" + METHOD_NAME + ".dat");
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
index 1680764..56cc41e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.Test;
@@ -156,7 +157,6 @@ public class TestLazyWriter extends LazyPersistTestCase {
     for (int i = 0; i < NUM_PATHS; ++i) {
       makeTestFile(paths[i + NUM_PATHS], BLOCK_SIZE, true);
       triggerBlockReport();
-      Thread.sleep(3000);
       ensureFileReplicasOnStorageType(paths[i + NUM_PATHS], RAM_DISK);
       ensureFileReplicasOnStorageType(paths[indexes.get(i)], DEFAULT);
       for (int j = i + 1; j < NUM_PATHS; ++j) {
@@ -183,13 +183,13 @@ public class TestLazyWriter extends LazyPersistTestCase {
       throws Exception {
     getClusterBuilder().build();
     final String METHOD_NAME = GenericTestUtils.getMethodName();
-    FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
+    final DataNode dn = cluster.getDataNodes().get(0);
+    FsDatasetTestUtil.stopLazyWriter(dn);
 
     Path path = new Path("/" + METHOD_NAME + ".dat");
     makeTestFile(path, BLOCK_SIZE, true);
     LocatedBlocks locatedBlocks =
         ensureFileReplicasOnStorageType(path, RAM_DISK);
-
     // Delete before persist
     client.delete(path.toString(), false);
     Assert.assertFalse(fs.exists(path));


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to