Repository: hadoop
Updated Branches:
  refs/heads/branch-2 035f5f8f1 -> e19f91024
  refs/heads/branch-2.8 564d9e610 -> 911ae15f6
  refs/heads/trunk 7558dbbb4 -> 98bdb5139


HADOOP-13169. Randomize file list in SimpleCopyListing. Contributed by Rajesh 
Balamohan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/98bdb513
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/98bdb513
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/98bdb513

Branch: refs/heads/trunk
Commit: 98bdb5139769eb55893971b43b9c23da9513a784
Parents: 7558dbb
Author: Chris Nauroth <cnaur...@apache.org>
Authored: Mon Sep 19 15:16:47 2016 -0700
Committer: Chris Nauroth <cnaur...@apache.org>
Committed: Mon Sep 19 15:16:47 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/tools/DistCpConstants.java    |   4 +
 .../apache/hadoop/tools/SimpleCopyListing.java  | 114 +++++++++++++++++--
 .../apache/hadoop/tools/TestCopyListing.java    |  83 +++++++++++++-
 3 files changed, 189 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/98bdb513/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
index 95d26df..96f364c 100644
--- 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
+++ 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
@@ -58,6 +58,10 @@ public class DistCpConstants {
   public static final String CONF_LABEL_APPEND = "distcp.copy.append";
   public static final String CONF_LABEL_DIFF = "distcp.copy.diff";
   public static final String CONF_LABEL_BANDWIDTH_MB = 
"distcp.map.bandwidth.mb";
+  public static final String CONF_LABEL_SIMPLE_LISTING_FILESTATUS_SIZE =
+      "distcp.simplelisting.file.status.size";
+  public static final String CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES =
+      "distcp.simplelisting.randomize.files";
   public static final String CONF_LABEL_FILTERS_FILE =
       "distcp.filters.file";
   public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/98bdb513/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
index 3f52203..bc30aa1 100644
--- 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
+++ 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.tools;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -42,7 +43,10 @@ import com.google.common.annotations.VisibleForTesting;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
 
 import static org.apache.hadoop.tools.DistCpConstants
         .HDFS_RESERVED_RAW_DIRECTORY_NAME;
@@ -56,13 +60,19 @@ import static org.apache.hadoop.tools.DistCpConstants
 public class SimpleCopyListing extends CopyListing {
   private static final Log LOG = LogFactory.getLog(SimpleCopyListing.class);
 
+  public static final int DEFAULT_FILE_STATUS_SIZE = 1000;
+  public static final boolean DEFAULT_RANDOMIZE_FILE_LISTING = true;
+
   private long totalPaths = 0;
   private long totalDirs = 0;
   private long totalBytesToCopy = 0;
   private int numListstatusThreads = 1;
+  private final int fileStatusLimit;
+  private final boolean randomizeFileListing;
   private final int maxRetries = 3;
   private CopyFilter copyFilter;
   private DistCpSync distCpSync;
+  private final Random rnd = new Random();
 
   /**
    * Protected constructor, to initialize configuration.
@@ -76,6 +86,17 @@ public class SimpleCopyListing extends CopyListing {
     numListstatusThreads = getConf().getInt(
         DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS,
         DistCpConstants.DEFAULT_LISTSTATUS_THREADS);
+    fileStatusLimit = Math.max(1, getConf()
+        .getInt(DistCpConstants.CONF_LABEL_SIMPLE_LISTING_FILESTATUS_SIZE,
+        DEFAULT_FILE_STATUS_SIZE));
+    randomizeFileListing = getConf().getBoolean(
+        DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES,
+        DEFAULT_RANDOMIZE_FILE_LISTING);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("numListstatusThreads=" + numListstatusThreads
+          + ", fileStatusLimit=" + fileStatusLimit
+          + ", randomizeFileListing=" + randomizeFileListing);
+    }
     copyFilter = CopyFilter.getCopyFilter(getConf());
     copyFilter.initialize();
   }
@@ -83,9 +104,13 @@ public class SimpleCopyListing extends CopyListing {
   @VisibleForTesting
   protected SimpleCopyListing(Configuration configuration,
                               Credentials credentials,
-                              int numListstatusThreads) {
+                              int numListstatusThreads,
+                              int fileStatusLimit,
+                              boolean randomizeFileListing) {
     super(configuration, credentials);
     this.numListstatusThreads = numListstatusThreads;
+    this.fileStatusLimit = Math.max(1, fileStatusLimit);
+    this.randomizeFileListing = randomizeFileListing;
   }
 
   protected SimpleCopyListing(Configuration configuration,
@@ -236,6 +261,7 @@ public class SimpleCopyListing extends CopyListing {
     FileSystem sourceFS = sourceRoot.getFileSystem(getConf());
 
     try {
+      List<FileStatusInfo> fileStatuses = Lists.newArrayList();
       for (DiffInfo diff : diffList) {
         // add snapshot paths prefix
         diff.target = new Path(options.getSourcePaths().get(0), diff.target);
@@ -259,10 +285,13 @@ public class SimpleCopyListing extends CopyListing {
             sourceDirs.add(sourceStatus);
 
             traverseDirectory(fileListWriter, sourceFS, sourceDirs,
-                sourceRoot, options, excludeList);
+                sourceRoot, options, excludeList, fileStatuses);
           }
         }
       }
+      if (randomizeFileListing) {
+        writeToFileListing(fileStatuses, fileListWriter);
+      }
       fileListWriter.close();
       fileListWriter = null;
     } finally {
@@ -296,6 +325,7 @@ public class SimpleCopyListing extends CopyListing {
     }
 
     try {
+      List<FileStatusInfo> statusList = Lists.newArrayList();
       for (Path path: options.getSourcePaths()) {
         FileSystem sourceFS = path.getFileSystem(getConf());
         final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
@@ -326,8 +356,14 @@ public class SimpleCopyListing extends CopyListing {
                   preserveAcls && sourceStatus.isDirectory(),
                   preserveXAttrs && sourceStatus.isDirectory(),
                   preserveRawXAttrs && sourceStatus.isDirectory());
-            writeToFileListing(fileListWriter, sourceCopyListingStatus,
-                sourcePathRoot);
+            if (randomizeFileListing) {
+              addToFileListing(statusList,
+                  new FileStatusInfo(sourceCopyListingStatus, sourcePathRoot),
+                  fileListWriter);
+            } else {
+              writeToFileListing(fileListWriter, sourceCopyListingStatus,
+                  sourcePathRoot);
+            }
 
             if (sourceStatus.isDirectory()) {
               if (LOG.isDebugEnabled()) {
@@ -337,9 +373,12 @@ public class SimpleCopyListing extends CopyListing {
             }
           }
           traverseDirectory(fileListWriter, sourceFS, sourceDirs,
-                            sourcePathRoot, options, null);
+              sourcePathRoot, options, null, statusList);
         }
       }
+      if (randomizeFileListing) {
+        writeToFileListing(statusList, fileListWriter);
+      }
       fileListWriter.close();
       printStats();
       LOG.info("Build file listing completed.");
@@ -349,6 +388,52 @@ public class SimpleCopyListing extends CopyListing {
     }
   }
 
+  private void addToFileListing(List<FileStatusInfo> fileStatusInfoList,
+      FileStatusInfo statusInfo, SequenceFile.Writer fileListWriter)
+      throws IOException {
+    fileStatusInfoList.add(statusInfo);
+    if (fileStatusInfoList.size() > fileStatusLimit) {
+      writeToFileListing(fileStatusInfoList, fileListWriter);
+    }
+  }
+
+  @VisibleForTesting
+  void setSeedForRandomListing(long seed) {
+    this.rnd.setSeed(seed);
+  }
+
+  private void writeToFileListing(List<FileStatusInfo> fileStatusInfoList,
+      SequenceFile.Writer fileListWriter) throws IOException {
+    /**
+     * In cloud storage systems, it is possible to get region hotspot.
+     * Shuffling paths can avoid such cases and also ensure that
+     * some mappers do not get lots of similar paths.
+     */
+    Collections.shuffle(fileStatusInfoList, rnd);
+    for (FileStatusInfo fileStatusInfo : fileStatusInfoList) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding " + fileStatusInfo.fileStatus.getPath());
+      }
+      writeToFileListing(fileListWriter, fileStatusInfo.fileStatus,
+          fileStatusInfo.sourceRootPath);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Number of paths written to fileListing="
+          + fileStatusInfoList.size());
+    }
+    fileStatusInfoList.clear();
+  }
+
+  private static class FileStatusInfo {
+    private CopyListingFileStatus fileStatus;
+    private Path sourceRootPath;
+
+    FileStatusInfo(CopyListingFileStatus fileStatus, Path sourceRootPath) {
+      this.fileStatus = fileStatus;
+      this.sourceRootPath = sourceRootPath;
+    }
+  }
+
   private Path computeSourceRootPath(FileStatus sourceStatus,
                                      DistCpOptions options) throws IOException 
{
 
@@ -516,15 +601,18 @@ public class SimpleCopyListing extends CopyListing {
                                  ArrayList<FileStatus> sourceDirs,
                                  Path sourcePathRoot,
                                  DistCpOptions options,
-                                 HashSet<String> excludeList)
+                                 HashSet<String> excludeList,
+                                 List<FileStatusInfo> fileStatuses)
                                  throws IOException {
     final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
     final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
     final boolean preserveRawXattrs = options.shouldPreserveRawXattrs();
 
     assert numListstatusThreads > 0;
-    LOG.debug("Starting thread pool of " + numListstatusThreads +
-              " listStatus workers.");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Starting thread pool of " + numListstatusThreads +
+          " listStatus workers.");
+    }
     ProducerConsumer<FileStatus, FileStatus[]> workers =
         new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
     for (int i = 0; i < numListstatusThreads; i++) {
@@ -551,8 +639,14 @@ public class SimpleCopyListing extends CopyListing {
                 preserveAcls && child.isDirectory(),
                 preserveXAttrs && child.isDirectory(),
                 preserveRawXattrs && child.isDirectory());
-            writeToFileListing(fileListWriter, childCopyListingStatus,
-                 sourcePathRoot);
+            if (randomizeFileListing) {
+              addToFileListing(fileStatuses,
+                  new FileStatusInfo(childCopyListingStatus, sourcePathRoot),
+                  fileListWriter);
+            } else {
+              writeToFileListing(fileListWriter, childCopyListingStatus,
+                  sourcePathRoot);
+            }
           }
           if (retry < maxRetries) {
             if (child.isDirectory()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/98bdb513/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
index 896763d..ea63e23 100644
--- 
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
+++ 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
@@ -25,7 +25,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.tools.util.TestDistCpUtils;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.security.Credentials;
@@ -46,7 +45,9 @@ import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Random;
 
 @RunWith(value = Parameterized.class)
 public class TestCopyListing extends SimpleCopyListing {
@@ -77,7 +78,7 @@ public class TestCopyListing extends SimpleCopyListing {
   }
 
   public TestCopyListing(int numListstatusThreads) {
-    super(config, CREDENTIALS, numListstatusThreads);
+    super(config, CREDENTIALS, numListstatusThreads, 0, false);
   }
 
   protected TestCopyListing(Configuration configuration) {
@@ -221,6 +222,84 @@ public class TestCopyListing extends SimpleCopyListing {
     }
   }
 
+  @Test(timeout=60000)
+  public void testWithRandomFileListing() throws IOException {
+    FileSystem fs = null;
+    try {
+      fs = FileSystem.get(getConf());
+      List<Path> srcPaths = new ArrayList<>();
+      List<Path> srcFiles = new ArrayList<>();
+      Path target = new Path("/tmp/out/1");
+      final int pathCount = 25;
+      for (int i = 0; i < pathCount; i++) {
+        Path p = new Path("/tmp", String.valueOf(i));
+        srcPaths.add(p);
+        fs.mkdirs(p);
+
+        Path fileName = new Path(p, i + ".txt");
+        srcFiles.add(fileName);
+        try (OutputStream out = fs.create(fileName)) {
+          out.write(i);
+        }
+      }
+
+      Path listingFile = new Path("/tmp/file");
+      DistCpOptions options = new DistCpOptions(srcPaths, target);
+      options.setSyncFolder(true);
+
+      // Check without randomizing files
+      getConf().setBoolean(
+          DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, false);
+      SimpleCopyListing listing = new SimpleCopyListing(getConf(), 
CREDENTIALS);
+      listing.buildListing(listingFile, options);
+
+      Assert.assertEquals(listing.getNumberOfPaths(), pathCount);
+      validateFinalListing(listingFile, srcFiles);
+      fs.delete(listingFile, true);
+
+      // Check with randomized file listing
+      getConf().setBoolean(
+          DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, true);
+      listing = new SimpleCopyListing(getConf(), CREDENTIALS);
+
+      // Set the seed for randomness, so that it can be verified later
+      long seed = System.nanoTime();
+      listing.setSeedForRandomListing(seed);
+      listing.buildListing(listingFile, options);
+      Assert.assertEquals(listing.getNumberOfPaths(), pathCount);
+
+      // validate randomness
+      Collections.shuffle(srcFiles, new Random(seed));
+      validateFinalListing(listingFile, srcFiles);
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp");
+    }
+  }
+
+  private void validateFinalListing(Path pathToListFile, List<Path> srcFiles)
+      throws IOException {
+    FileSystem fs = pathToListFile.getFileSystem(config);
+
+    try (SequenceFile.Reader reader = new SequenceFile.Reader(
+        config, SequenceFile.Reader.file(pathToListFile))) {
+      CopyListingFileStatus currentVal = new CopyListingFileStatus();
+
+      Text currentKey = new Text();
+      int idx = 0;
+      while (reader.next(currentKey)) {
+        reader.getCurrentValue(currentVal);
+        Assert.assertEquals("srcFiles.size=" + srcFiles.size()
+                + ", idx=" + idx, fs.makeQualified(srcFiles.get(idx)),
+            currentVal.getPath());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("val=" + fs.makeQualified(srcFiles.get(idx)));
+        }
+        idx++;
+      }
+    }
+  }
+
+
   @Test(timeout=10000)
   public void testBuildListingForSingleFile() {
     FileSystem fs = null;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to