Author: kihwal
Date: Tue Apr 16 22:01:18 2013
New Revision: 1468629

URL: http://svn.apache.org/r1468629
Log:
MAPREDUCE-5065. DistCp should skip checksum comparisons if block-sizes are 
different on source/target. Contributed by Mithun Radhakrishnan.

Modified:
    
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
    
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java

Modified: 
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java?rev=1468629&r1=1468628&r2=1468629&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
 (original)
+++ 
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
 Tue Apr 16 22:01:18 2013
@@ -140,10 +140,17 @@ public class RetriableFileCopyCommand ex
   private void compareCheckSums(FileSystem sourceFS, Path source,
                                 FileSystem targetFS, Path target)
                                 throws IOException {
-    if (!DistCpUtils.checksumsAreEqual(sourceFS, source, targetFS, target))
-      throw new IOException("Check-sum mismatch between "
-                              + source + " and " + target);
-
+    if (!DistCpUtils.checksumsAreEqual(sourceFS, source, targetFS, target)) {
+      StringBuilder errorMessage = new StringBuilder("Check-sum mismatch 
between ")
+          .append(source).append(" and ").append(target).append(".");
+      if (sourceFS.getFileStatus(source).getBlockSize() != 
targetFS.getFileStatus(target).getBlockSize()) {
+        errorMessage.append(" Source and target differ in block-size.")
+            .append(" Use -pb to preserve block-sizes during copy.")
+            .append(" Alternatively, skip checksum-checks altogether, using 
-skipCrc.")
+                                               .append(" (NOTE: By skipping 
checksums, one runs the risk of masking data-corruption during 
file-transfer.)");
+      }
+      throw new IOException(errorMessage.toString());
+    }
   }
 
   //If target file exists and unable to delete target - fail

Modified: 
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java?rev=1468629&r1=1468628&r2=1468629&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
 (original)
+++ 
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
 Tue Apr 16 22:01:18 2013
@@ -53,7 +53,7 @@ public class TestCopyMapper {
   private static final Log LOG = LogFactory.getLog(TestCopyMapper.class);
   private static List<Path> pathList = new ArrayList<Path>();
   private static int nFiles = 0;
-  private static final int FILE_SIZE = 1024;
+  private static final int DEFAULT_FILE_SIZE = 1024;
 
   private static MiniDFSCluster cluster;
 
@@ -92,7 +92,7 @@ public class TestCopyMapper {
     configuration.setBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(),
             false);
     configuration.setBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(),
-            true);
+            false);
     configuration.setBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(),
             true);
     configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
@@ -112,6 +112,18 @@ public class TestCopyMapper {
     touchFile(SOURCE_PATH + "/7/8/9");
   }
 
+  private static void createSourceDataWithDifferentBlockSize() throws 
Exception {
+    mkdirs(SOURCE_PATH + "/1");
+    mkdirs(SOURCE_PATH + "/2");
+    mkdirs(SOURCE_PATH + "/2/3/4");
+    mkdirs(SOURCE_PATH + "/2/3");
+    mkdirs(SOURCE_PATH + "/5");
+    touchFile(SOURCE_PATH + "/5/6", true);
+    mkdirs(SOURCE_PATH + "/7");
+    mkdirs(SOURCE_PATH + "/7/8");
+    touchFile(SOURCE_PATH + "/7/8/9");
+  }
+
   private static void mkdirs(String path) throws Exception {
     FileSystem fileSystem = cluster.getFileSystem();
     final Path qualifiedPath = new 
Path(path).makeQualified(fileSystem.getUri(),
@@ -121,17 +133,31 @@ public class TestCopyMapper {
   }
 
   private static void touchFile(String path) throws Exception {
+    touchFile(path, false);
+  }
+
+  private static void touchFile(String path, boolean createMultipleBlocks) 
throws Exception {
+    final long NON_DEFAULT_BLOCK_SIZE = 4096;
     FileSystem fs;
     DataOutputStream outputStream = null;
     try {
       fs = cluster.getFileSystem();
       final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(),
                                                       
fs.getWorkingDirectory());
-      final long blockSize = fs.getDefaultBlockSize(qualifiedPath) * 2;
+      final long blockSize = createMultipleBlocks? NON_DEFAULT_BLOCK_SIZE : 
fs.getDefaultBlockSize(qualifiedPath) * 2;
       outputStream = fs.create(qualifiedPath, true, 0,
               (short)(fs.getDefaultReplication(qualifiedPath)*2),
               blockSize);
-      outputStream.write(new byte[FILE_SIZE]);
+      byte[] bytes = new byte[DEFAULT_FILE_SIZE];
+      outputStream.write(bytes);
+      long fileSize = DEFAULT_FILE_SIZE;
+      if (createMultipleBlocks) {
+        while (fileSize < 2*blockSize) {
+          outputStream.write(bytes);
+          outputStream.flush();
+          fileSize += DEFAULT_FILE_SIZE;
+        }
+      }
       pathList.add(qualifiedPath);
       ++nFiles;
 
@@ -144,7 +170,7 @@ public class TestCopyMapper {
     }
   }
 
-  @Test
+  @Test(timeout=40000)
   public void testRun() {
     try {
       deleteState();
@@ -179,7 +205,7 @@ public class TestCopyMapper {
 
       Assert.assertEquals(pathList.size(),
               
stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue());
-      Assert.assertEquals(nFiles * FILE_SIZE,
+      Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE,
               
stubContext.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED).getValue());
 
       testCopyingExistingFiles(fs, copyMapper, context);
@@ -211,7 +237,7 @@ public class TestCopyMapper {
     }
   }
 
-  @Test
+  @Test(timeout=40000)
   public void testMakeDirFailure() {
     try {
       deleteState();
@@ -239,13 +265,13 @@ public class TestCopyMapper {
     }
   }
 
-  @Test
+  @Test(timeout=40000)
   public void testIgnoreFailures() {
     doTestIgnoreFailures(true);
     doTestIgnoreFailures(false);
   }
 
-  @Test
+  @Test(timeout=40000)
   public void testDirToFile() {
     try {
       deleteState();
@@ -273,7 +299,7 @@ public class TestCopyMapper {
     }
   }
 
-  @Test
+  @Test(timeout=40000)
   public void testPreserve() {
     try {
       deleteState();
@@ -343,7 +369,7 @@ public class TestCopyMapper {
     }
   }
 
-  @Test
+  @Test(timeout=40000)
   public void testCopyReadableFiles() {
     try {
       deleteState();
@@ -406,7 +432,7 @@ public class TestCopyMapper {
     }
   }
 
-  @Test
+  @Test(timeout=40000)
   public void testSkipCopyNoPerms() {
     try {
       deleteState();
@@ -480,7 +506,7 @@ public class TestCopyMapper {
     }
   }
 
-  @Test
+  @Test(timeout=40000)
   public void testFailCopyWithAccessControlException() {
     try {
       deleteState();
@@ -563,7 +589,7 @@ public class TestCopyMapper {
     }
   }
 
-  @Test
+  @Test(timeout=40000)
   public void testFileToDir() {
     try {
       deleteState();
@@ -640,12 +666,48 @@ public class TestCopyMapper {
     cluster.getFileSystem().delete(new Path(TARGET_PATH), true);
   }
 
-  @Test
+  @Test(timeout=40000)
   public void testPreserveBlockSizeAndReplication() {
     testPreserveBlockSizeAndReplicationImpl(true);
     testPreserveBlockSizeAndReplicationImpl(false);
   }
 
+  @Test(timeout=40000)
+  public void testCopyFailOnBlockSizeDifference() {
+    try {
+
+      deleteState();
+      createSourceDataWithDifferentBlockSize();
+
+      FileSystem fs = cluster.getFileSystem();
+      CopyMapper copyMapper = new CopyMapper();
+      StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+      Mapper<Text, FileStatus, Text, Text>.Context context
+          = stubContext.getContext();
+
+      Configuration configuration = context.getConfiguration();
+      EnumSet<DistCpOptions.FileAttribute> fileAttributes
+          = EnumSet.noneOf(DistCpOptions.FileAttribute.class);
+      configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
+          DistCpUtils.packAttributes(fileAttributes));
+
+      copyMapper.setup(context);
+
+      for (Path path : pathList) {
+        final FileStatus fileStatus = fs.getFileStatus(path);
+        copyMapper.map(new Text(DistCpUtils.getRelativePath(new 
Path(SOURCE_PATH), path)),
+            fileStatus, context);
+      }
+
+      Assert.fail("Copy should have failed because of block-size difference.");
+    }
+    catch (Exception exception) {
+      // Check that the exception suggests the use of -pb/-skipCrc.
+      Assert.assertTrue("Failure exception should have suggested the use of 
-pb.", exception.getCause().getCause().getMessage().contains("pb"));
+      Assert.assertTrue("Failure exception should have suggested the use of 
-skipCrc.", exception.getCause().getCause().getMessage().contains("skipCrc"));
+    }
+  }
+
   private void testPreserveBlockSizeAndReplicationImpl(boolean preserve){
     try {
 
@@ -717,7 +779,7 @@ public class TestCopyMapper {
    * If a single file is being copied to a location where the file (of the same
    * name) already exists, then the file shouldn't be skipped.
    */
-  @Test
+  @Test(timeout=40000)
   public void testSingleFileCopy() {
     try {
       deleteState();
@@ -766,7 +828,7 @@ public class TestCopyMapper {
     }
   }
 
-  @Test
+  @Test(timeout=40000)
   public void testPreserveUserGroup() {
     testPreserveUserGroupImpl(true);
     testPreserveUserGroupImpl(false);


Reply via email to