HIVE-12988 : Improve dynamic partition loading IV (Ashutosh Chauhan via Prasanth J)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a14ef8ab Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a14ef8ab Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a14ef8ab Branch: refs/heads/llap Commit: a14ef8abe1df1516b8b9f486030bc3d584f940a9 Parents: 1de97bc Author: Ashutosh Chauhan <hashut...@apache.org> Authored: Tue Feb 2 18:03:44 2016 -0800 Committer: Ashutosh Chauhan <hashut...@apache.org> Committed: Tue Mar 29 11:27:12 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 4 +- .../apache/hadoop/hive/ql/metadata/Hive.java | 252 +++++++++++-------- .../org/apache/hadoop/fs/ProxyFileSystem.java | 5 +- 3 files changed, 155 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/a14ef8ab/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index b8870f2..f03c1ab 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2326,6 +2326,8 @@ public class HiveConf extends Configuration { HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile", "Comma separated list of non-SQL Hive commands users are authorized to execute"), + HIVE_MOVE_FILES_THREAD_COUNT("hive.mv.files.thread", 25, new SizeValidator(1L, true, 1024L, true), "Number of threads" + + " used to move files in move task"), // If this is set all move tasks at the end of a multi-insert query will only begin once all // outputs are ready HIVE_MULTI_INSERT_MOVE_TASKS_SHARE_DEPENDENCIES( @@ -2771,7 +2773,7 @@ public class HiveConf extends Configuration { SPARK_RPC_SASL_MECHANISM("hive.spark.client.rpc.sasl.mechanisms", "DIGEST-MD5", "Name of the SASL mechanism to use for authentication."), SPARK_RPC_SERVER_ADDRESS("hive.spark.client.rpc.server.address", "", - "The server address of HiverServer2 host to be used for communication between Hive client and remote Spark driver. " + + "The server address of HiverServer2 host to be used for communication between Hive client and remote Spark driver. " + "Default is empty, which means the address will be determined in the same way as for hive.server2.thrift.bind.host." + "This is only necessary if the host has mutiple network addresses and if a different network address other than " + "hive.server2.thrift.bind.host is to be used."), http://git-wip-us.apache.org/repos/asf/hive/blob/a14ef8ab/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 6d27f55..c27481f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -32,19 +32,25 @@ import java.io.IOException; import java.io.PrintStream; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.ImmutableMap; + import javax.jdo.JDODataStoreException; import org.apache.hadoop.conf.Configuration; @@ -132,6 +138,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * This class has functions that implement meta data/DDL operations using calls @@ -1504,7 +1511,7 @@ public class Hive { isSrcLocal); } else { if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null) { - newFiles = new ArrayList<>(); + newFiles = Collections.synchronizedList(new ArrayList<Path>()); } FileSystem fs = tbl.getDataLocation().getFileSystem(conf); @@ -1751,9 +1758,13 @@ private void constructOneLBLocationMap(FileStatus fSta, public void loadTable(Path loadPath, String tableName, boolean replace, boolean isSrcLocal, boolean isSkewedStoreAsSubdir, boolean isAcid) throws HiveException { - List<Path> newFiles = new ArrayList<Path>(); + + List<Path> newFiles = null; Table tbl = getTable(tableName); HiveConf sessionConf = SessionState.getSessionConf(); + if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) { + newFiles = Collections.synchronizedList(new ArrayList<Path>()); + } if (replace) { Path tableDest = tbl.getPath(); replaceFiles(tableDest, loadPath, tableDest, tableDest, sessionConf, isSrcLocal); @@ -2579,75 +2590,91 @@ private void constructOneLBLocationMap(FileStatus fSta, } } - // for each file or directory in 'srcs', make mapping for every file in src to safe name in dest - private static List<List<Path[]>> checkPaths(HiveConf conf, FileSystem fs, - FileStatus[] srcs, FileSystem srcFs, Path destf, boolean replace) - throws HiveException { + private static void copyFiles(final HiveConf conf, final FileSystem destFs, + FileStatus[] srcs, final FileSystem srcFs, final Path destf, final boolean isSrcLocal, final List<Path> newFiles) + throws HiveException { - List<List<Path[]>> result = new ArrayList<List<Path[]>>(); + final HadoopShims.HdfsFileStatus fullDestStatus; try { - FileStatus destStatus = !replace ? FileUtils.getFileStatusOrNull(fs, destf) : null; - if (destStatus != null && !destStatus.isDir()) { - throw new HiveException("checkPaths: destination " + destf - + " should be a directory"); - } - for (FileStatus src : srcs) { - FileStatus[] items; - if (src.isDir()) { - items = srcFs.listStatus(src.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Arrays.sort(items); + fullDestStatus = ShimLoader.getHadoopShims().getFullFileStatus(conf, destFs, destf); + } catch (IOException e1) { + throw new HiveException(e1); + } + + if (!fullDestStatus.getFileStatus().isDirectory()) { + throw new HiveException(destf + " is not a directory."); + } + final boolean inheritPerms = HiveConf.getBoolVar(conf, + HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); + final List<Future<ObjectPair<Path, Path>>> futures = new LinkedList<>(); + final ExecutorService pool = Executors.newFixedThreadPool( + conf.getIntVar(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MoveDir-Thread-%d").build()); + + for (FileStatus src : srcs) { + FileStatus[] files; + if (src.isDirectory()) { + try { + files = srcFs.listStatus(src.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); + } catch (IOException e) { + pool.shutdownNow(); + throw new HiveException(e); + } + } else { + files = new FileStatus[] {src}; + } + + for (FileStatus srcFile : files) { + + final Path srcP = srcFile.getPath(); + final boolean needToCopy = needToCopy(srcP, destf, srcFs, destFs); + // Strip off the file type, if any so we don't make: + // 000000_0.gz -> 000000_0.gz_copy_1 + final String name; + final String filetype; + String itemName = srcP.getName(); + int index = itemName.lastIndexOf('.'); + if (index >= 0) { + filetype = itemName.substring(index); + name = itemName.substring(0, index); } else { - items = new FileStatus[] {src}; + name = itemName; + filetype = ""; } - - List<Path[]> srcToDest = new ArrayList<Path[]>(); - for (FileStatus item : items) { - - Path itemSource = item.getPath(); - - if (Utilities.isTempPath(item)) { - // This check is redundant because temp files are removed by - // execution layer before - // calling loadTable/Partition. But leaving it in just in case. - srcFs.delete(itemSource, true); - continue; - } - - Path itemDest = new Path(destf, itemSource.getName()); - - if (!replace) { - // Strip off the file type, if any so we don't make: - // 000000_0.gz -> 000000_0.gz_copy_1 - String name = itemSource.getName(); - String filetype; - int index = name.lastIndexOf('.'); - if (index >= 0) { - filetype = name.substring(index); - name = name.substring(0, index); + futures.add(pool.submit(new Callable<ObjectPair<Path, Path>>() { + @Override + public ObjectPair<Path, Path> call() throws Exception { + Path destPath = new Path(destf, srcP.getName()); + if (!needToCopy && !isSrcLocal) { + for (int counter = 1; !destFs.rename(srcP,destPath); counter++) { + destPath = new Path(destf, name + ("_copy_" + counter) + filetype); + } } else { - filetype = ""; + destPath = mvFile(conf, srcP, destPath, isSrcLocal, srcFs, destFs, name, filetype); } - // It's possible that the file we're copying may have the same - // relative name as an existing file in the "destf" directory. - // So let's make a quick check to see if we can rename any - // potential offenders so as to allow them to move into the - // "destf" directory. The scheme is dead simple: simply tack - // on "_copy_N" where N starts at 1 and works its way up until - // we find a free space. - - // removed source file staging.. it's more confusing when failed. - for (int counter = 1; fs.exists(itemDest) || destExists(result, itemDest); counter++) { - itemDest = new Path(destf, name + ("_copy_" + counter) + filetype); + + if (inheritPerms) { + ShimLoader.getHadoopShims().setFullFileStatus(conf, fullDestStatus, destFs, destf); } + if (null != newFiles) { + newFiles.add(destPath); + } + return ObjectPair.create(srcP, destPath); } - srcToDest.add(new Path[]{itemSource, itemDest}); - } - result.add(srcToDest); + })); + } + } + pool.shutdown(); + for (Future<ObjectPair<Path, Path>> future : futures) { + try { + ObjectPair<Path, Path> pair = future.get(); + LOG.debug("Moved src: {}", pair.getFirst().toString(), ", to dest: {}", pair.getSecond().toString()); + } catch (Exception e) { + LOG.error("Failed to move: {}", e.getMessage()); + pool.shutdownNow(); + throw new HiveException(e.getCause()); } - } catch (IOException e) { - throw new HiveException("checkPaths: filesystem error in check phase. " + e.getMessage(), e); } - return result; } private static boolean destExists(List<List<Path[]>> result, Path proposed) { @@ -2704,14 +2731,34 @@ private void constructOneLBLocationMap(FileStatus fSta, return ShimLoader.getHadoopShims().getPathWithoutSchemeAndAuthority(path).toString(); } + private static Path mvFile(HiveConf conf, Path srcf, Path destf, boolean isSrcLocal, + FileSystem srcFs, FileSystem destFs, String srcName, String filetype) throws IOException { + + for (int counter = 1; destFs.exists(destf); counter++) { + destf = new Path(destf.getParent(), srcName + ("_copy_" + counter) + filetype); + } + if (isSrcLocal) { + // For local src file, copy to hdfs + destFs.copyFromLocalFile(srcf, destf); + } else { + //copy if across file system or encryption zones. + LOG.info("Copying source " + srcf + " to " + destf + " because HDFS encryption zones are different."); + FileUtils.copy(srcFs, srcf, destFs, destf, + true, // delete source + false, // overwrite destination + conf); + } + return destf; + } + //it is assumed that parent directory of the destf should already exist when this //method is called. when the replace value is true, this method works a little different //from mv command if the destf is a directory, it replaces the destf instead of moving under //the destf. in this case, the replaced destf still preserves the original destf's permission - public static boolean moveFile(HiveConf conf, Path srcf, Path destf, + public static boolean moveFile(HiveConf conf, Path srcf, final Path destf, boolean replace, boolean isSrcLocal) throws HiveException { boolean success = false; - FileSystem srcFs, destFs; + final FileSystem srcFs, destFs; try { destFs = destf.getFileSystem(conf); } catch (IOException e) { @@ -2775,31 +2822,38 @@ private void constructOneLBLocationMap(FileStatus fSta, FileStatus[] srcs = destFs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER); if (srcs.length == 0) { success = true; // Nothing to move. - } - /* Move files one by one because source is a subdirectory of destination */ - for (FileStatus status : srcs) { - Path destFile; - - /* Append the source filename to the destination directory */ - if (destFs.isDirectory(destf)) { - destFile = new Path(destf, status.getPath().getName()); - } else { - destFile = destf; + } else { + List<Future<Boolean>> futures = new LinkedList<>(); + final ExecutorService pool = Executors.newFixedThreadPool( + conf.getIntVar(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MoveDir-Thread-%d").build()); + /* Move files one by one because source is a subdirectory of destination */ + for (final FileStatus status : srcs) { + futures.add(pool.submit(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + return destFs.rename(status.getPath(), destf); + } + })); } - - // Destination should be replaced, so we delete it first - if (destFs.exists(destFile)) { - if (!destFs.delete(destFile, true)) { - throw new HiveException(String.format("File to replace could not be deleted: %s", destFile)); + pool.shutdown(); + boolean allFutures = true; + for (Future<Boolean> future : futures) { + try { + Boolean result = future.get(); + allFutures &= result; + if (!result) { + LOG.debug("Failed to rename."); + pool.shutdownNow(); + } + } catch (Exception e) { + LOG.debug("Failed to rename.", e.getMessage()); + pool.shutdownNow(); + throw new HiveException(e.getCause()); } } - - if (!(destFs.rename(status.getPath(), destFile))) { - throw new HiveException("Unable to move source " + status.getPath() + " to destination " + destf); - } + success = allFutures; } - - success = true; } else { success = destFs.rename(srcf, destf); } @@ -2825,8 +2879,9 @@ private void constructOneLBLocationMap(FileStatus fSta, /** * If moving across different FileSystems or differnent encryption zone, need to do a File copy instead of rename. * TODO- consider if need to do this for different file authority. + * @throws HiveException */ - static protected boolean needToCopy(Path srcf, Path destf, FileSystem srcFs, FileSystem destFs) throws HiveException, IOException { + static protected boolean needToCopy(Path srcf, Path destf, FileSystem srcFs, FileSystem destFs) throws HiveException { //Check if different FileSystems if (!srcFs.getClass().equals(destFs.getClass())) { return true; @@ -2834,8 +2889,12 @@ private void constructOneLBLocationMap(FileStatus fSta, //Check if different encryption zones HadoopShims.HdfsEncryptionShim hdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim(); - return hdfsEncryptionShim != null && (hdfsEncryptionShim.isPathEncrypted(srcf) || hdfsEncryptionShim.isPathEncrypted(destf)) - && !hdfsEncryptionShim.arePathsOnSameEncryptionZone(srcf, destf); + try { + return hdfsEncryptionShim != null && (hdfsEncryptionShim.isPathEncrypted(srcf) || hdfsEncryptionShim.isPathEncrypted(destf)) + && !hdfsEncryptionShim.arePathsOnSameEncryptionZone(srcf, destf); + } catch (IOException e) { + throw new HiveException(e); + } } /** @@ -2886,22 +2945,7 @@ private void constructOneLBLocationMap(FileStatus fSta, if (isAcid) { moveAcidFiles(srcFs, srcs, destf, newFiles); } else { - // check that source and target paths exist - List<List<Path[]>> result = checkPaths(conf, fs, srcs, srcFs, destf, false); - // move it, move it - try { - for (List<Path[]> sdpairs : result) { - for (Path[] sdpair : sdpairs) { - if (!moveFile(conf, sdpair[0], sdpair[1], false, isSrcLocal)) { - throw new IOException("Cannot move " + sdpair[0] + " to " - + sdpair[1]); - } - if (newFiles != null) newFiles.add(sdpair[1]); - } - } - } catch (IOException e) { - throw new HiveException("copyFiles: error while moving files!!! " + e.getMessage(), e); - } + copyFiles(conf, fs, srcs, srcFs, destf, isSrcLocal, newFiles); } } http://git-wip-us.apache.org/repos/asf/hive/blob/a14ef8ab/shims/common/src/main/java/org/apache/hadoop/fs/ProxyFileSystem.java ---------------------------------------------------------------------- diff --git a/shims/common/src/main/java/org/apache/hadoop/fs/ProxyFileSystem.java b/shims/common/src/main/java/org/apache/hadoop/fs/ProxyFileSystem.java index cb1e2b7..2c37a51 100644 --- a/shims/common/src/main/java/org/apache/hadoop/fs/ProxyFileSystem.java +++ b/shims/common/src/main/java/org/apache/hadoop/fs/ProxyFileSystem.java @@ -82,6 +82,7 @@ public class ProxyFileSystem extends FilterFileSystem { * @return * @throws IOException */ + @Override public Path resolvePath(final Path p) throws IOException { // Return the fully-qualified path of path f resolving the path // through any symlinks or mount point @@ -174,7 +175,9 @@ public class ProxyFileSystem extends FilterFileSystem { @Override public boolean rename(Path src, Path dst) throws IOException { - return super.rename(swizzleParamPath(src), swizzleParamPath(dst)); + Path dest = swizzleParamPath(dst); + // Make sure for existing destination we return false as per FileSystem api contract + return super.isFile(dest) ? false : super.rename(swizzleParamPath(src), dest); } @Override