This is an automated email from the ASF dual-hosted git repository. anishek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 69bf986 HIVE-24328: Run distcp in parallel for all file entries in repl load.(Aasha Medhi, reviewed by Pravin Kumar Sinha) 69bf986 is described below commit 69bf9862052bd3f59296387b436d2e76d2ef2619 Author: Anishek Agarwal <anis...@gmail.com> AuthorDate: Fri Nov 13 13:51:29 2020 +0530 HIVE-24328: Run distcp in parallel for all file entries in repl load.(Aasha Medhi, reviewed by Pravin Kumar Sinha) --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 6 ++ .../hive/ql/parse/TestReplicationScenarios.java | 1 - .../hadoop/hive/ql/parse/repl/CopyUtils.java | 67 +++++++++++++++++----- .../hadoop/hive/ql/parse/repl/TestCopyUtils.java | 45 +++++++++++++++ 4 files changed, 105 insertions(+), 14 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 94da886..e8c7bf8 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -647,6 +647,12 @@ public class HiveConf extends Configuration { "Provide the maximum number of partitions of a table that will be batched together during \n" + "repl load. All the partitions in a batch will make a single metastore call to update the metadata. \n" + "The data for these partitions will be copied before copying the metadata batch. "), + REPL_PARALLEL_COPY_TASKS("hive.repl.parallel.copy.tasks",100, + "Provide the maximum number of parallel copy operation(distcp or regular copy) launched for a table \n" + + "or partition. This will create at max 100 threads which will run copy in parallel for the data files at \n" + + " table or partition level. If hive.exec.parallel \n" + + "is set to true then max worker threads created for copy can be hive.exec.parallel.thread.number(determines \n" + + "number of copy tasks in parallel) * hive.repl.parallel.copy.tasks "), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 6028282..ab81834 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -108,7 +108,6 @@ import java.util.Base64; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; -import static org.apache.hadoop.hive.metastore.Warehouse.getFs; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.NON_RECOVERABLE_MARKER; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index 5a662ff..db68250 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.parse.repl; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.fs.ContentSummary; @@ -45,6 +46,9 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.Collectors; public class CopyUtils { @@ -60,12 +64,14 @@ public class CopyUtils { private final boolean hiveInReplTest; private final String copyAsUser; private FileSystem destinationFs; + private final int maxParallelCopyTask; public CopyUtils(String distCpDoAsUser, HiveConf hiveConf, FileSystem destinationFs) { this.hiveConf = hiveConf; maxNumberOfFiles = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES); maxCopyFileSize = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE); hiveInReplTest = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL); + maxParallelCopyTask = hiveConf.getIntVar(HiveConf.ConfVars.REPL_PARALLEL_COPY_TASKS); this.copyAsUser = distCpDoAsUser; this.destinationFs = destinationFs; } @@ -82,27 +88,33 @@ public class CopyUtils { } FileSystem sourceFs = srcFiles.get(0).getSrcFs(); boolean useRegularCopy = regularCopy(sourceFs, srcFiles); + ExecutorService executorService = null; try { if (useRegularCopy || readSrcAsFilesList) { // Layout of data files may differ based on the type of tables. Map<FileSystem, Map< Path, List<ReplChangeManager.FileInfo>>> map = fsToFileMap(srcFiles, destRoot); for (Map.Entry<FileSystem, Map<Path, List<ReplChangeManager.FileInfo>>> entry : map.entrySet()) { Map<Path, List<ReplChangeManager.FileInfo>> destMap = entry.getValue(); - for (Map.Entry<Path, List<ReplChangeManager.FileInfo>> destMapEntry : destMap.entrySet()) { - Path destination = destMapEntry.getKey(); - List<ReplChangeManager.FileInfo> fileInfoList = destMapEntry.getValue(); - // Get the file system again from cache. There is a chance that the file system stored in the map is closed. - // For instance, doCopyRetry closes the file system in case of i/o exceptions. - sourceFs = fileInfoList.get(0).getSourcePath().getFileSystem(hiveConf); - if (!destinationFs.exists(destination) - && !FileUtils.mkdir(destinationFs, destination, hiveConf)) { - LOG.error("Failed to create destination directory: " + destination); - throw new IOException("Destination directory creation failed"); + if (destMap.size() > 1) { + //Multiple files, do copy in parallel + if (executorService == null) { + executorService = getExecutorService(); + } + List<Callable<Void>> copyList = new ArrayList<>(); + for (Map.Entry<Path, List<ReplChangeManager.FileInfo>> destMapEntry : destMap.entrySet()) { + copyList.add(() -> { + doCopy(destMapEntry, proxyUser, regularCopy(sourceFs, destMapEntry.getValue()), overwrite); + return null; + }); + } + executorService.invokeAll(copyList); + } else { + //Since just a single file, just do a copy in the same thread + for (Map.Entry<Path, List<ReplChangeManager.FileInfo>> destMapEntry : destMap.entrySet()) { + doCopy(destMapEntry, proxyUser, regularCopy(sourceFs, destMapEntry.getValue()), overwrite); } - - // Copy files with retry logic on failure or source file is dropped or changed. - doCopyRetry(sourceFs, fileInfoList, destination, proxyUser, useRegularCopy, overwrite); } + } } else { // When distCp is to be used and the srcFiles doesn't contain subDirs (readSrcAsFilesList=false), @@ -112,13 +124,42 @@ public class CopyUtils { srcFiles.add(new ReplChangeManager.FileInfo(sourceFs, origSrcPath, null)); doCopyRetry(sourceFs, srcFiles, destRoot, proxyUser, useRegularCopy, overwrite); } + } catch (InterruptedException e) { + LOG.error("Failed to copy ", e); + throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg()); } finally { + if (executorService != null) { + executorService.shutdown(); + } if (proxyUser != null) { FileSystem.closeAllForUGI(proxyUser); } } } + @VisibleForTesting + ExecutorService getExecutorService() { + return Executors.newFixedThreadPool(maxParallelCopyTask); + } + + @VisibleForTesting + void doCopy(Map.Entry<Path, List<ReplChangeManager.FileInfo>> destMapEntry, UserGroupInformation proxyUser, + boolean useRegularCopy, boolean overwrite) throws IOException, LoginException, + HiveFatalException { + Path destination = destMapEntry.getKey(); + List<ReplChangeManager.FileInfo> fileInfoList = destMapEntry.getValue(); + // Get the file system again from cache. There is a chance that the file system stored in the map is closed. + // For instance, doCopyRetry closes the file system in case of i/o exceptions. + FileSystem sourceFsOfFileInfo = fileInfoList.get(0).getSourcePath().getFileSystem(hiveConf); + if (!destinationFs.exists(destination) + && !FileUtils.mkdir(destinationFs, destination, hiveConf)) { + LOG.error("Failed to create destination directory: " + destination); + throw new IOException("Destination directory creation failed"); + } + // Copy files with retry logic on failure or source file is dropped or changed. + doCopyRetry(sourceFsOfFileInfo, fileInfoList, destination, proxyUser, useRegularCopy, overwrite); + } + private void doCopyRetry(FileSystem sourceFs, List<ReplChangeManager.FileInfo> srcFileList, Path destination, UserGroupInformation proxyUser, boolean useRegularCopy, boolean overwrite) throws IOException, diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java index 535bc6d..94993bb 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; @@ -35,7 +36,11 @@ import org.powermock.modules.junit4.PowerMockRunner; import java.io.IOException; import java.util.Arrays; +import java.util.Collection; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static org.junit.Assert.assertFalse; import static org.mockito.ArgumentMatchers.anyBoolean; @@ -43,6 +48,7 @@ import static org.mockito.ArgumentMatchers.anyListOf; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.powermock.api.mockito.PowerMockito.mockStatic; import static org.powermock.api.mockito.PowerMockito.when; @@ -103,4 +109,43 @@ public class TestCopyUtils { copyUtils.doCopy(destination, srcPaths); } + + @Test + public void testParallelCopySuccess() throws Exception { + mockStatic(UserGroupInformation.class); + when(UserGroupInformation.getCurrentUser()).thenReturn(mock(UserGroupInformation.class)); + HiveConf conf = Mockito.spy(new HiveConf()); + when(conf.getIntVar(HiveConf.ConfVars.REPL_PARALLEL_COPY_TASKS)).thenReturn(2); + when(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)).thenReturn(true); + FileSystem destFs = mock(FileSystem.class); + when(destFs.exists(Mockito.any())).thenReturn(true); + CopyUtils copyUtils = new CopyUtils(UserGroupInformation.getCurrentUser().getUserName(), conf, destFs); + CopyUtils copyUtilsSpy = Mockito.spy(copyUtils); + ExecutorService executorService = Executors.newFixedThreadPool(2); + ExecutorService mockExecutorService = Mockito.spy(executorService); + when(copyUtilsSpy.getExecutorService()).thenReturn(mockExecutorService); + Path destination = new Path("dest"); + Path source = mock(Path.class); + FileSystem fs = mock(FileSystem.class); + ReplChangeManager.FileInfo srcFileInfo = new ReplChangeManager.FileInfo(fs, source, "path1"); + List<ReplChangeManager.FileInfo> srcFiles = Arrays.asList(srcFileInfo); + doNothing().when(copyUtilsSpy).doCopy(Mockito.any(), Mockito.any(), + Mockito.anyBoolean(), Mockito.anyBoolean()); + copyUtilsSpy.copyAndVerify(destination, srcFiles, source, true, true); + Class<Collection<? extends Callable<Void>>> listClass = + (Class<Collection<? extends Callable<Void>>>)(Class)List.class; + //Thread pool Not invoked as only one target path + ArgumentCaptor<Collection<? extends Callable<Void>>> callableCapture = ArgumentCaptor.forClass(listClass); + Mockito.verify(mockExecutorService, Mockito.times(0)).invokeAll(callableCapture.capture()); + ReplChangeManager.FileInfo srcFileInfo1 = new ReplChangeManager.FileInfo(fs, source, "path2"); + ReplChangeManager.FileInfo srcFileInfo2 = new ReplChangeManager.FileInfo(fs, source, "path3"); + srcFiles = Arrays.asList(srcFileInfo1, srcFileInfo2); + executorService = Executors.newFixedThreadPool(2); + mockExecutorService = Mockito.spy(executorService); + when(copyUtilsSpy.getExecutorService()).thenReturn(mockExecutorService); + copyUtilsSpy.copyAndVerify(destination, srcFiles, source, true, true); + //File count is greater than 1 do thread pool invoked + Mockito.verify(mockExecutorService, + Mockito.times(1)).invokeAll(callableCapture.capture()); + } } \ No newline at end of file