This is an automated email from the ASF dual-hosted git repository. kihwal pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2.10 by this push: new 232e9f8 HDFS-15147. LazyPersistTestCase wait logic is error-prone. Contributed by Ahmed Hussein. 232e9f8 is described below commit 232e9f8ee117a29a2f8f1360bebd8d0e8def826a Author: Kihwal Lee <kih...@apache.org> AuthorDate: Thu Feb 27 09:58:44 2020 -0600 HDFS-15147. LazyPersistTestCase wait logic is error-prone. Contributed by Ahmed Hussein. --- .../hdfs/server/blockmanagement/BlockManager.java | 11 +- .../hadoop/hdfs/server/namenode/FSNamesystem.java | 18 +- .../java/org/apache/hadoop/hdfs/DFSTestUtil.java | 2 +- .../fsdataset/impl/LazyPersistTestCase.java | 234 ++++++++++++++++++--- .../fsdataset/impl/TestLazyPersistFiles.java | 77 +++---- .../impl/TestLazyPersistReplicaPlacement.java | 2 +- .../datanode/fsdataset/impl/TestLazyWriter.java | 6 +- 7 files changed, 260 insertions(+), 90 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index fd8739e..5addf5a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -46,9 +46,7 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; - import javax.management.ObjectName; - import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -233,6 +231,8 @@ public class BlockManager implements BlockStatsMXBean { /** Replication thread. */ final Daemon replicationThread = new Daemon(new ReplicationMonitor()); + /** Timestamp for the last cycle of the redundancy thread. */ + private final AtomicLong lastReplicationCycleTS = new AtomicLong(-1); /** Block report thread for handling async reports. */ private final BlockReportProcessingThread blockReportThread = @@ -3986,11 +3986,15 @@ public class BlockManager implements BlockStatsMXBean { return neededReplications.size(); } + @VisibleForTesting + public long getLastReplicationCycleTS() { + return lastReplicationCycleTS.get(); + } + /** * Periodically calls computeReplicationWork(). */ private class ReplicationMonitor implements Runnable { - @Override public void run() { while (namesystem.isRunning()) { @@ -4000,6 +4004,7 @@ public class BlockManager implements BlockStatsMXBean { computeDatanodeWork(); processPendingReplications(); rescanPostponedMisreplicatedBlocks(); + lastReplicationCycleTS.set(Time.monotonicNow()); } Thread.sleep(replicationRecheckInterval); } catch (Throwable t) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index f0af5b4..11ac3fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -87,9 +87,12 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROU import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; + import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_KEY; -import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER; + +import java.util.concurrent.atomic.AtomicLong; + import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*; import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.ACTIVE; import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.OBSERVER; @@ -294,6 +297,7 @@ import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Time; import org.apache.hadoop.util.VersionInfo; import org.apache.log4j.Appender; import org.apache.log4j.AsyncAppender; @@ -450,6 +454,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, // A daemon to periodically clean up corrupt lazyPersist files // from the name space. Daemon lazyPersistFileScrubber = null; + private final AtomicLong lazyPersistFileScrubberTS = new AtomicLong(0); + // Executor to warm up EDEK cache private ExecutorService edekCacheLoader = null; @@ -603,6 +609,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return leaseManager; } + @VisibleForTesting + public long getLazyPersistFileScrubberTS() { + return lazyPersistFileScrubber == null ? -1 + : lazyPersistFileScrubberTS.get(); + } + public boolean isHaEnabled() { return haEnabled; } @@ -3943,6 +3955,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, try { if (!isInSafeMode()) { clearCorruptLazyPersistFiles(); + // set the timeStamp of last Cycle. + lazyPersistFileScrubberTS.set(Time.monotonicNow()); } else { if (FSNamesystem.LOG.isDebugEnabled()) { FSNamesystem.LOG @@ -3953,7 +3967,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, FSNamesystem.LOG.error( "Ignoring exception in LazyPersistFileScrubber:", e); } - try { Thread.sleep(scrubIntervalSec * 1000); } catch (InterruptedException e) { @@ -7183,4 +7196,3 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } } - diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index f81e90e3..bee3314 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1909,7 +1909,7 @@ public class DFSTestUtil { throw new UnhandledException("Test failed due to unexpected exception", e); } } - }, 1000, 60000); + }, 50, 60000); } public static StorageReceivedDeletedBlocks[] makeReportForReceivedBlock( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java index 13ea940..ef34222 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import com.google.common.base.Supplier; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; + import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST; import static org.apache.hadoop.fs.StorageType.DEFAULT; @@ -36,15 +37,27 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeoutException; import com.google.common.base.Preconditions; import org.apache.commons.io.IOUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; @@ -81,16 +94,28 @@ public abstract class LazyPersistTestCase { GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, Level.DEBUG); } + protected static final Logger LOG = + LoggerFactory.getLogger(LazyPersistTestCase.class); protected static final int BLOCK_SIZE = 5 * 1024 * 1024; protected static final int BUFFER_LENGTH = 4096; - private static final long HEARTBEAT_INTERVAL_SEC = 1; - private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500; - private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk"; - private static final String JMX_SERVICE_NAME = "DataNode"; protected static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3; protected static final int LAZY_WRITER_INTERVAL_SEC = 1; - protected static final Log LOG = LogFactory.getLog(LazyPersistTestCase.class); protected static final short REPL_FACTOR = 1; + private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk"; + private static final String JMX_SERVICE_NAME = "DataNode"; + private static final long HEARTBEAT_INTERVAL_SEC = 1; + private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500; + private static final int WAIT_FOR_FBR_MS = 10 * 1000; + private static final int WAIT_FOR_STORAGE_TYPES_MS = 30 * 1000; + private static final int WAIT_FOR_ASYNC_DELETE_MS = 10 * 1000; + private static final int WAIT_FOR_DN_SHUTDOWN_MS = 30 * 1000; + private static final int WAIT_FOR_REDUNDANCY_MS = + 2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000; + private static final int WAIT_FOR_LAZY_SCRUBBER_MS = + 2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000; + private static final int WAIT_POLL_INTERVAL_MS = 10; + private static final int WAIT_POLL_INTERVAL_LARGE_MS = 20; + protected final long osPageSize = NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize(); @@ -154,7 +179,7 @@ public abstract class LazyPersistTestCase { return false; } } - }, 100, 30 * 1000); + }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_STORAGE_TYPES_MS); return client.getLocatedBlocks(path.toString(), 0, fileLength); } @@ -429,11 +454,38 @@ public abstract class LazyPersistTestCase { private boolean disableScrubber=false; } + /** + * Force a FBR on all the datanodes. + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ protected final void triggerBlockReport() - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { // Trigger block report to NN - DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0)); - Thread.sleep(10 * 1000); + final Map<DatanodeStorageInfo, Integer> reportCounts = new HashMap<>(); + final FSNamesystem fsn = cluster.getNamesystem(); + for (DataNode dn : cluster.getDataNodes()) { + final DatanodeDescriptor dnd = + NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId()); + final DatanodeStorageInfo storage = dnd.getStorageInfos()[0]; + reportCounts.put(storage, storage.getBlockReportCount()); + DataNodeTestUtils.triggerBlockReport(dn); + } + // wait for block reports to be received. + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + for (Entry<DatanodeStorageInfo, Integer> repCntEntry : reportCounts + .entrySet()) { + if (repCntEntry.getValue() == repCntEntry.getKey() + .getBlockReportCount()) { + return false; + } + } + return true; + } + }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_FBR_MS); } protected final boolean verifyBlockDeletedFromDir(File dir, @@ -445,51 +497,65 @@ public abstract class LazyPersistTestCase { File blockFile = new File(targetDir, lb.getBlock().getBlockName()); if (blockFile.exists()) { - LOG.warn("blockFile: " + blockFile.getAbsolutePath() + - " exists after deletion."); return false; } File metaFile = new File(targetDir, DatanodeUtil.getMetaName(lb.getBlock().getBlockName(), lb.getBlock().getGenerationStamp())); if (metaFile.exists()) { - LOG.warn("metaFile: " + metaFile.getAbsolutePath() + - " exists after deletion."); return false; } } return true; } - protected final boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks) - throws IOException, InterruptedException { + protected final boolean verifyDeletedBlocks(final LocatedBlocks locatedBlocks) + throws Exception { LOG.info("Verifying replica has no saved copy after deletion."); triggerBlockReport(); + final DataNode dn = cluster.getDataNodes().get(0); - while( - cluster.getFsDatasetTestUtils(0).getPendingAsyncDeletions() - > 0L){ - Thread.sleep(1000); - } + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + for (DataNode dn1 : cluster.getDataNodes()) { + if (cluster.getFsDatasetTestUtils(dn1).getPendingAsyncDeletions() + > 0) { + return false; + } + } + return true; + } + }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_ASYNC_DELETE_MS); final String bpid = cluster.getNamesystem().getBlockPoolId(); - final FsDatasetSpi<?> dataset = - cluster.getDataNodes().get(0).getFSDataset(); - + final FsDatasetSpi<?> dataset = dn.getFSDataset(); // Make sure deleted replica does not have a copy on either finalized dir of - // transient volume or finalized dir of non-transient volume + // transient volume or finalized dir of non-transient volume. + // We need to wait until the asyn deletion is scheduled. try (FsDatasetSpi.FsVolumeReferences volumes = dataset.getFsVolumeReferences()) { - for (FsVolumeSpi vol : volumes) { - FsVolumeImpl volume = (FsVolumeImpl) vol; - File targetDir = (volume.isTransientStorage()) ? - volume.getBlockPoolSlice(bpid).getFinalizedDir() : - volume.getBlockPoolSlice(bpid).getLazypersistDir(); - if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) { - return false; + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + try { + for (FsVolumeSpi vol : volumes) { + FsVolumeImpl volume = (FsVolumeImpl) vol; + File targetDir = (volume.isTransientStorage()) ? + volume.getBlockPoolSlice(bpid).getFinalizedDir() : + volume.getBlockPoolSlice(bpid).getLazypersistDir(); + if (!LazyPersistTestCase.this + .verifyBlockDeletedFromDir(targetDir, locatedBlocks)) { + return false; + } + } + return true; + } catch (IOException ie) { + return false; + } } - } + }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_ASYNC_DELETE_MS); } return true; } @@ -534,4 +600,104 @@ public abstract class LazyPersistTestCase { FsDatasetImpl fsDataset = (FsDatasetImpl) dn.getFSDataset(); fsDataset.evictLazyPersistBlocks(Long.MAX_VALUE); // Run one eviction cycle. } + + /** + * the DataNodes and sleep for the time it takes the NN to detect the DN as + * being dead. + */ + protected void shutdownDataNodes() + throws TimeoutException, InterruptedException { + cluster.shutdownDataNodes(); + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + try { + DatanodeInfo[] info = client.datanodeReport( + HdfsConstants.DatanodeReportType.LIVE); + return info.length == 0; + } catch (IOException e) { + return false; + } + } + }, WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_DN_SHUTDOWN_MS); + } + + protected void waitForCorruptBlock(final long corruptCnt) + throws TimeoutException, InterruptedException { + // wait for the redundancy monitor to mark the file as corrupt. + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + Iterator<BlockInfo> bInfoIter = cluster.getNameNode() + .getNamesystem().getBlockManager().getCorruptReplicaBlockIterator(); + int count = 0; + while (bInfoIter.hasNext()) { + bInfoIter.next(); + count++; + } + return corruptCnt == count; + } + }, 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS); + } + + protected void waitForScrubberCycle() + throws TimeoutException, InterruptedException { + // wait for the redundancy monitor to mark the file as corrupt. + final FSNamesystem fsn = cluster.getNamesystem(); + final long lastTimeStamp = fsn.getLazyPersistFileScrubberTS(); + if (lastTimeStamp == -1) { // scrubber is disabled + return; + } + GenericTestUtils.waitFor( + new Supplier<Boolean>() { + @Override + public Boolean get() { + return lastTimeStamp != fsn.getLazyPersistFileScrubberTS(); + } + }, 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_LAZY_SCRUBBER_MS); + } + + protected void waitForRedundancyMonitorCycle() + throws TimeoutException, InterruptedException { + // wait for the redundancy monitor to mark the file as corrupt. + final BlockManager bm = cluster.getNamesystem().getBlockManager(); + final long lastRedundancyTS = + bm.getLastReplicationCycleTS(); + + GenericTestUtils.waitFor( + new Supplier<Boolean>() { + @Override + public Boolean get() { + return lastRedundancyTS != bm.getLastReplicationCycleTS(); + } + }, + 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS); + } + + protected void waitForRedundancyCount(final long cnt) + throws TimeoutException, InterruptedException { + final BlockManager bm = cluster.getNamesystem().getBlockManager(); + + GenericTestUtils.waitFor( + new Supplier<Boolean>() { + @Override + public Boolean get() { + return cnt == bm.getUnderReplicatedBlocksCount(); + } + }, 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS); + } + + protected void waitForFile(final Path path, final boolean expected) + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + try { + return expected == fs.exists(path); + } catch (IOException e) { + return false; + } + } + }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_STORAGE_TYPES_MS); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java index 8c43592..b177a7b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java @@ -16,8 +16,6 @@ * limitations under the License. */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; -import com.google.common.collect.Iterators; -import com.google.common.util.concurrent.Uninterruptibles; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.Path; import org.apache.hadoop.test.GenericTestUtils; @@ -33,7 +31,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.fs.StorageType.RAM_DISK; -import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; @@ -98,28 +95,20 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { makeTestFile(path1, BLOCK_SIZE, true); ensureFileReplicasOnStorageType(path1, RAM_DISK); - // Stop the DataNode and sleep for the time it takes the NN to - // detect the DN as being dead. - cluster.shutdownDataNodes(); - Thread.sleep(30000L); + // Stop the DataNode. + shutdownDataNodes(); assertThat(cluster.getNamesystem().getNumDeadDataNodes(), is(1)); - // Next, wait for the replication monitor to mark the file as corrupt - Thread.sleep(2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000); - + // Next, wait for the redundancy monitor to mark the file as corrupt. + waitForRedundancyMonitorCycle(); // Wait for the LazyPersistFileScrubber to run - Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000); - + waitForScrubberCycle(); // Ensure that path1 does not exist anymore, whereas path2 does. - assert(!fs.exists(path1)); + waitForFile(path1, false); // We should have zero blocks that needs replication i.e. the one - // belonging to path2. - assertThat(cluster.getNameNode() - .getNamesystem() - .getBlockManager() - .getUnderReplicatedBlocksCount(), - is(0L)); + // belonging to path2. This needs a wait. + waitForRedundancyCount(0L); } @Test @@ -134,18 +123,14 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { // Stop the DataNode and sleep for the time it takes the NN to // detect the DN as being dead. - cluster.shutdownDataNodes(); - Thread.sleep(30000L); - - // Next, wait for the replication monitor to mark the file as corrupt - Thread.sleep(2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000); + shutdownDataNodes(); + // wait for the redundancy monitor to mark the file as corrupt. + waitForCorruptBlock(1L); // Wait for the LazyPersistFileScrubber to run - Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000); - + waitForScrubberCycle(); // Ensure that path1 exist. - Assert.assertTrue(fs.exists(path1)); - + waitForFile(path1, true); } /** @@ -160,21 +145,14 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { makeTestFile(path1, BLOCK_SIZE, true); ensureFileReplicasOnStorageType(path1, RAM_DISK); - cluster.shutdownDataNodes(); + shutdownDataNodes(); cluster.restartNameNodes(); - // wait for the replication monitor to mark the file as corrupt - Thread.sleep(2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000); - - Long corruptBlkCount = (long) Iterators.size(cluster.getNameNode() - .getNamesystem().getBlockManager().getCorruptReplicaBlockIterator()); - - // Check block detected as corrupted - assertThat(corruptBlkCount, is(1L)); - + // wait for the redundancy monitor to mark the file as corrupt. + waitForCorruptBlock(1L); // Ensure path1 exist. - Assert.assertTrue(fs.exists(path1)); + waitForFile(path1, true); } /** @@ -216,10 +194,19 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { threads[i].start(); } - Thread.sleep(500); - for (int i = 0; i < NUM_TASKS; i++) { - Uninterruptibles.joinUninterruptibly(threads[i]); + boolean interrupted = false; + while (true) { + try { + threads[i].join(); + break; + } catch (InterruptedException e) { + interrupted = true; + } + } + if (interrupted) { + Thread.currentThread().interrupt(); + } } Assert.assertFalse(testFailed.get()); } @@ -233,7 +220,7 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { */ @Test public void testConcurrentWrites() - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { getClusterBuilder().setRamDiskReplicaCapacity(9).build(); final String METHOD_NAME = GenericTestUtils.getMethodName(); final int SEED = 0xFADED; @@ -282,11 +269,11 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { this.seed = seed; this.latch = latch; this.bFail = bFail; - System.out.println("Creating Writer: " + id); + LOG.info("Creating Writer: " + id); } public void run() { - System.out.println("Writer " + id + " starting... "); + LOG.info("Writer " + id + " starting... "); int i = 0; try { for (i = 0; i < paths.length; i++) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java index c16dbe5..b6413ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java @@ -119,7 +119,7 @@ public class TestLazyPersistReplicaPlacement extends LazyPersistTestCase { */ @Test public void testFallbackToDiskPartial() - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { getClusterBuilder().setMaxLockedMemory(2 * BLOCK_SIZE).build(); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java index 1680764..56cc41e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Test; @@ -156,7 +157,6 @@ public class TestLazyWriter extends LazyPersistTestCase { for (int i = 0; i < NUM_PATHS; ++i) { makeTestFile(paths[i + NUM_PATHS], BLOCK_SIZE, true); triggerBlockReport(); - Thread.sleep(3000); ensureFileReplicasOnStorageType(paths[i + NUM_PATHS], RAM_DISK); ensureFileReplicasOnStorageType(paths[indexes.get(i)], DEFAULT); for (int j = i + 1; j < NUM_PATHS; ++j) { @@ -183,13 +183,13 @@ public class TestLazyWriter extends LazyPersistTestCase { throws Exception { getClusterBuilder().build(); final String METHOD_NAME = GenericTestUtils.getMethodName(); - FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0)); + final DataNode dn = cluster.getDataNodes().get(0); + FsDatasetTestUtil.stopLazyWriter(dn); Path path = new Path("/" + METHOD_NAME + ".dat"); makeTestFile(path, BLOCK_SIZE, true); LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK); - // Delete before persist client.delete(path.toString(), false); Assert.assertFalse(fs.exists(path)); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org