This is an automated email from the ASF dual-hosted git repository.

anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 69bf986  HIVE-24328: Run distcp in parallel for all file entries in 
repl load.(Aasha Medhi, reviewed by Pravin Kumar Sinha)
69bf986 is described below

commit 69bf9862052bd3f59296387b436d2e76d2ef2619
Author: Anishek Agarwal <anis...@gmail.com>
AuthorDate: Fri Nov 13 13:51:29 2020 +0530

    HIVE-24328: Run distcp in parallel for all file entries in repl load.(Aasha 
Medhi, reviewed by Pravin Kumar Sinha)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  6 ++
 .../hive/ql/parse/TestReplicationScenarios.java    |  1 -
 .../hadoop/hive/ql/parse/repl/CopyUtils.java       | 67 +++++++++++++++++-----
 .../hadoop/hive/ql/parse/repl/TestCopyUtils.java   | 45 +++++++++++++++
 4 files changed, 105 insertions(+), 14 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 94da886..e8c7bf8 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -647,6 +647,12 @@ public class HiveConf extends Configuration {
       "Provide the maximum number of partitions of a table that will be 
batched together during  \n"
         + "repl load. All the partitions in a batch will make a single 
metastore call to update the metadata. \n"
         + "The data for these partitions will be copied before copying the 
metadata batch. "),
+    REPL_PARALLEL_COPY_TASKS("hive.repl.parallel.copy.tasks",100,
+      "Provide the maximum number of parallel copy operation(distcp or regular 
copy) launched for a table  \n"
+        + "or partition. This will create at max 100 threads which will run 
copy in parallel for the data files at \n"
+        + " table or partition level. If hive.exec.parallel \n"
+        + "is set to true then max worker threads created for copy can be 
hive.exec.parallel.thread.number(determines \n"
+        + "number of copy tasks in parallel) * hive.repl.parallel.copy.tasks 
"),
     LOCALSCRATCHDIR("hive.exec.local.scratchdir",
         "${system:java.io.tmpdir}" + File.separator + "${system:user.name}",
         "Local scratch space for Hive jobs"),
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 6028282..ab81834 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -108,7 +108,6 @@ import java.util.Base64;
 
 import static 
org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
 import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
-import static org.apache.hadoop.hive.metastore.Warehouse.getFs;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT;
 import static 
org.apache.hadoop.hive.ql.exec.repl.ReplAck.NON_RECOVERABLE_MARKER;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
index 5a662ff..db68250 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.parse.repl;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.fs.ContentSummary;
@@ -45,6 +46,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
 public class CopyUtils {
@@ -60,12 +64,14 @@ public class CopyUtils {
   private final boolean hiveInReplTest;
   private final String copyAsUser;
   private FileSystem destinationFs;
+  private final int maxParallelCopyTask;
 
   public CopyUtils(String distCpDoAsUser, HiveConf hiveConf, FileSystem 
destinationFs) {
     this.hiveConf = hiveConf;
     maxNumberOfFiles = 
hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES);
     maxCopyFileSize = 
hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE);
     hiveInReplTest = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL);
+    maxParallelCopyTask = 
hiveConf.getIntVar(HiveConf.ConfVars.REPL_PARALLEL_COPY_TASKS);
     this.copyAsUser = distCpDoAsUser;
     this.destinationFs = destinationFs;
   }
@@ -82,27 +88,33 @@ public class CopyUtils {
     }
     FileSystem sourceFs = srcFiles.get(0).getSrcFs();
     boolean useRegularCopy = regularCopy(sourceFs, srcFiles);
+    ExecutorService executorService = null;
     try {
       if (useRegularCopy || readSrcAsFilesList) {
         // Layout of data files may differ based on the type of tables.
         Map<FileSystem, Map< Path, List<ReplChangeManager.FileInfo>>> map = 
fsToFileMap(srcFiles, destRoot);
         for (Map.Entry<FileSystem, Map<Path, 
List<ReplChangeManager.FileInfo>>> entry : map.entrySet()) {
           Map<Path, List<ReplChangeManager.FileInfo>> destMap = 
entry.getValue();
-          for (Map.Entry<Path, List<ReplChangeManager.FileInfo>> destMapEntry 
: destMap.entrySet()) {
-            Path destination = destMapEntry.getKey();
-            List<ReplChangeManager.FileInfo> fileInfoList = 
destMapEntry.getValue();
-            // Get the file system again from cache. There is a chance that 
the file system stored in the map is closed.
-            // For instance, doCopyRetry closes the file system in case of i/o 
exceptions.
-            sourceFs = 
fileInfoList.get(0).getSourcePath().getFileSystem(hiveConf);
-            if (!destinationFs.exists(destination)
-                    && !FileUtils.mkdir(destinationFs, destination, hiveConf)) 
{
-              LOG.error("Failed to create destination directory: " + 
destination);
-              throw new IOException("Destination directory creation failed");
+          if (destMap.size() > 1) {
+            //Multiple files, do copy in parallel
+            if (executorService == null) {
+              executorService = getExecutorService();
+            }
+            List<Callable<Void>> copyList = new ArrayList<>();
+            for (Map.Entry<Path, List<ReplChangeManager.FileInfo>> 
destMapEntry : destMap.entrySet()) {
+              copyList.add(() -> {
+                doCopy(destMapEntry, proxyUser, regularCopy(sourceFs, 
destMapEntry.getValue()), overwrite);
+                return null;
+              });
+            }
+            executorService.invokeAll(copyList);
+          } else {
+            //Since just a single file, just do a copy in the same thread
+            for (Map.Entry<Path, List<ReplChangeManager.FileInfo>> 
destMapEntry : destMap.entrySet()) {
+              doCopy(destMapEntry, proxyUser, regularCopy(sourceFs, 
destMapEntry.getValue()), overwrite);
             }
-
-            // Copy files with retry logic on failure or source file is 
dropped or changed.
-            doCopyRetry(sourceFs, fileInfoList, destination, proxyUser, 
useRegularCopy, overwrite);
           }
+
         }
       } else {
         // When distCp is to be used and the srcFiles doesn't contain subDirs 
(readSrcAsFilesList=false),
@@ -112,13 +124,42 @@ public class CopyUtils {
         srcFiles.add(new ReplChangeManager.FileInfo(sourceFs, origSrcPath, 
null));
         doCopyRetry(sourceFs, srcFiles, destRoot, proxyUser, useRegularCopy, 
overwrite);
       }
+    } catch (InterruptedException e) {
+      LOG.error("Failed to copy ", e);
+      throw new 
IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg());
     } finally {
+      if (executorService != null) {
+        executorService.shutdown();
+      }
       if (proxyUser != null) {
         FileSystem.closeAllForUGI(proxyUser);
       }
     }
   }
 
+  @VisibleForTesting
+  ExecutorService getExecutorService() {
+    return Executors.newFixedThreadPool(maxParallelCopyTask);
+  }
+
+  @VisibleForTesting
+  void doCopy(Map.Entry<Path, List<ReplChangeManager.FileInfo>> destMapEntry, 
UserGroupInformation proxyUser,
+                      boolean useRegularCopy, boolean overwrite) throws 
IOException, LoginException,
+    HiveFatalException {
+    Path destination = destMapEntry.getKey();
+    List<ReplChangeManager.FileInfo> fileInfoList = destMapEntry.getValue();
+    // Get the file system again from cache. There is a chance that the file 
system stored in the map is closed.
+    // For instance, doCopyRetry closes the file system in case of i/o 
exceptions.
+    FileSystem sourceFsOfFileInfo = 
fileInfoList.get(0).getSourcePath().getFileSystem(hiveConf);
+    if (!destinationFs.exists(destination)
+      && !FileUtils.mkdir(destinationFs, destination, hiveConf)) {
+      LOG.error("Failed to create destination directory: " + destination);
+      throw new IOException("Destination directory creation failed");
+    }
+    // Copy files with retry logic on failure or source file is dropped or 
changed.
+    doCopyRetry(sourceFsOfFileInfo, fileInfoList, destination, proxyUser, 
useRegularCopy, overwrite);
+  }
+
   private void doCopyRetry(FileSystem sourceFs, 
List<ReplChangeManager.FileInfo> srcFileList,
                            Path destination, UserGroupInformation proxyUser,
                            boolean useRegularCopy, boolean overwrite) throws 
IOException,
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java 
b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java
index 535bc6d..94993bb 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -35,7 +36,11 @@ import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static org.junit.Assert.assertFalse;
 import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -43,6 +48,7 @@ import static org.mockito.ArgumentMatchers.anyListOf;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.same;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.powermock.api.mockito.PowerMockito.when;
@@ -103,4 +109,43 @@ public class TestCopyUtils {
 
     copyUtils.doCopy(destination, srcPaths);
   }
+
+  @Test
+  public void testParallelCopySuccess() throws Exception {
+    mockStatic(UserGroupInformation.class);
+    
when(UserGroupInformation.getCurrentUser()).thenReturn(mock(UserGroupInformation.class));
+    HiveConf conf = Mockito.spy(new HiveConf());
+    
when(conf.getIntVar(HiveConf.ConfVars.REPL_PARALLEL_COPY_TASKS)).thenReturn(2);
+    
when(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)).thenReturn(true);
+    FileSystem destFs = mock(FileSystem.class);
+    when(destFs.exists(Mockito.any())).thenReturn(true);
+    CopyUtils copyUtils = new 
CopyUtils(UserGroupInformation.getCurrentUser().getUserName(), conf, destFs);
+    CopyUtils copyUtilsSpy = Mockito.spy(copyUtils);
+    ExecutorService executorService = Executors.newFixedThreadPool(2);
+    ExecutorService mockExecutorService = Mockito.spy(executorService);
+    when(copyUtilsSpy.getExecutorService()).thenReturn(mockExecutorService);
+    Path destination = new Path("dest");
+    Path source = mock(Path.class);
+    FileSystem fs = mock(FileSystem.class);
+    ReplChangeManager.FileInfo srcFileInfo = new 
ReplChangeManager.FileInfo(fs, source, "path1");
+    List<ReplChangeManager.FileInfo> srcFiles = Arrays.asList(srcFileInfo);
+    doNothing().when(copyUtilsSpy).doCopy(Mockito.any(), Mockito.any(),
+      Mockito.anyBoolean(), Mockito.anyBoolean());
+    copyUtilsSpy.copyAndVerify(destination, srcFiles, source, true, true);
+    Class<Collection<? extends Callable<Void>>> listClass =
+      (Class<Collection<? extends Callable<Void>>>)(Class)List.class;
+    //Thread pool Not invoked as only one target path
+    ArgumentCaptor<Collection<? extends Callable<Void>>> callableCapture = 
ArgumentCaptor.forClass(listClass);
+    Mockito.verify(mockExecutorService, 
Mockito.times(0)).invokeAll(callableCapture.capture());
+    ReplChangeManager.FileInfo srcFileInfo1 = new 
ReplChangeManager.FileInfo(fs, source, "path2");
+    ReplChangeManager.FileInfo srcFileInfo2 = new 
ReplChangeManager.FileInfo(fs, source, "path3");
+    srcFiles = Arrays.asList(srcFileInfo1, srcFileInfo2);
+    executorService = Executors.newFixedThreadPool(2);
+    mockExecutorService = Mockito.spy(executorService);
+    when(copyUtilsSpy.getExecutorService()).thenReturn(mockExecutorService);
+    copyUtilsSpy.copyAndVerify(destination, srcFiles, source, true, true);
+    //File count is greater than 1 do thread pool invoked
+    Mockito.verify(mockExecutorService,
+      Mockito.times(1)).invokeAll(callableCapture.capture());
+  }
 }
\ No newline at end of file

Reply via email to