This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch branch-3 in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/branch-3 by this push: new f45197e6bb3 HIVE-26815: Backport HIVE-26758 (Allow use scratchdir for staging final job) (#3840) f45197e6bb3 is described below commit f45197e6bb3c4d4d9413de2ad12b01c2b923f0fc Author: yigress <104102129+yigr...@users.noreply.github.com> AuthorDate: Wed Dec 7 13:23:12 2022 -0800 HIVE-26815: Backport HIVE-26758 (Allow use scratchdir for staging final job) (#3840) --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 6 ++- .../hadoop/hive/ql/exec/FileSinkOperator.java | 13 ++++-- .../org/apache/hadoop/hive/ql/exec/MoveTask.java | 4 ++ .../org/apache/hadoop/hive/ql/metadata/Hive.java | 7 ++- .../hadoop/hive/ql/optimizer/GenMapRedUtils.java | 13 ++++-- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 53 ++++++++++------------ 6 files changed, 57 insertions(+), 39 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 31ea29bc81c..44acef4532d 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4485,7 +4485,11 @@ public class HiveConf extends Configuration { "This parameter enables a number of optimizations when running on blobstores:\n" + "(1) If hive.blobstore.use.blobstore.as.scratchdir is false, force the last Hive job to write to the blobstore.\n" + "This is a performance optimization that forces the final FileSinkOperator to write to the blobstore.\n" + - "See HIVE-15121 for details."); + "See HIVE-15121 for details."), + + HIVE_USE_SCRATCHDIR_FOR_STAGING("hive.use.scratchdir.for.staging", false, + "Use ${hive.exec.scratchdir} for query results instead of ${hive.exec.stagingdir}.\n" + + "This stages query results in ${hive.exec.scratchdir} before moving to final destination."); public final String varname; public final String altName; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 56c32bf78cc..713c19a6888 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -845,11 +845,14 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements } private void createDpDirCheckSrc(final Path dpStagingPath, final Path dpFinalPath) throws IOException { - if (!fs.exists(dpStagingPath) && !fs.exists(dpFinalPath)) { - fs.mkdirs(dpStagingPath); - // move task will create dp final path - if (reporter != null) { - reporter.incrCounter(counterGroup, Operator.HIVE_COUNTER_CREATED_DYNAMIC_PARTITIONS, 1); + if (!fs.exists(dpStagingPath)) { + FileSystem dpFs = dpFinalPath.getFileSystem(hconf); + if (!dpFs.exists(dpFinalPath)) { + fs.mkdirs(dpStagingPath); + // move task will create dp final path + if (reporter != null) { + reporter.incrCounter(counterGroup, Operator.HIVE_COUNTER_CREATED_DYNAMIC_PARTITIONS, 1); + } } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index e6ab88fc127..315381d85b9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -311,6 +311,10 @@ public class MoveTask extends Task<MoveWork> implements Serializable { } } else { + FileSystem targetFs = targetPath.getFileSystem(conf); + if (!targetFs.exists(targetPath.getParent())){ + targetFs.mkdirs(targetPath.getParent()); + } moveFile(sourcePath, targetPath, lfd.getIsDfsDir()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 6ca82a67123..2d461ada49c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1805,8 +1805,8 @@ public class Hive { * See: HIVE-1707 and HIVE-2117 for background */ FileSystem oldPartPathFS = oldPartPath.getFileSystem(getConf()); - FileSystem loadPathFS = loadPath.getFileSystem(getConf()); - if (FileUtils.equalsFileSystem(oldPartPathFS,loadPathFS)) { + FileSystem tblPathFS = tblDataLocationPath.getFileSystem(getConf()); + if (FileUtils.equalsFileSystem(oldPartPathFS, tblPathFS)) { newPartPath = oldPartPath; } } @@ -4314,6 +4314,9 @@ private void constructOneLBLocationMap(FileStatus fSta, if (isNeedRecycle && conf.getBoolVar(HiveConf.ConfVars.REPLCMENABLED)) { recycleDirToCmPath(path, purge); } + if (!fs.exists(path)) { + return; + } FileStatus[] statuses = fs.listStatus(path, pathFilter); if (statuses == null || statuses.length == 0) { return; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index f7eb711fb74..d333e6b8c7d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1990,9 +1990,16 @@ public final class GenMapRedUtils { // it must be on the same file system as the current destination Context baseCtx = parseCtx.getContext(); - // Create the required temporary file in the HDFS location if the destination - // path of the FileSinkOperator table is a blobstore path. - Path tmpDir = baseCtx.getTempDirForFinalJobPath(fileSinkDesc.getDestPath()); + Path tmpDir = null; + if (hconf.getBoolVar(ConfVars.HIVE_USE_SCRATCHDIR_FOR_STAGING)) { + tmpDir = baseCtx.getTempDirForInterimJobPath(fileSinkDesc.getDestPath()); + } else { + tmpDir = baseCtx.getTempDirForFinalJobPath(fileSinkDesc.getDestPath()); + } + DynamicPartitionCtx dpCtx = fileSinkDesc.getDynPartCtx(); + if (dpCtx != null && dpCtx.getSPPath() != null) { + tmpDir = new Path(tmpDir, dpCtx.getSPPath()); + } // Change all the linked file sink descriptors if (fileSinkDesc.isLinkedFileSink()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 89852816977..93d70100334 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7259,37 +7259,12 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { checkImmutableTable(qb, destinationTable, destinationPath, false); - // check for partition - List<FieldSchema> parts = destinationTable.getPartitionKeys(); - if (parts != null && parts.size() > 0) { // table is partitioned - if (partSpec == null || partSpec.size() == 0) { // user did NOT specify partition - throw new SemanticException(generateErrorMessage( - qb.getParseInfo().getDestForClause(dest), - ErrorMsg.NEED_PARTITION_ERROR.getMsg())); - } - dpCtx = qbm.getDPCtx(dest); - if (dpCtx == null) { - destinationTable.validatePartColumnNames(partSpec, false); - dpCtx = new DynamicPartitionCtx(partSpec, - conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME), - conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE)); - qbm.setDPCtx(dest, dpCtx); - } - } - // Check for dynamic partitions. dpCtx = checkDynPart(qb, qbm, destinationTable, partSpec, dest); - if (dpCtx != null && dpCtx.getSPPath() != null) { - destinationPath = new Path(destinationTable.getPath(), dpCtx.getSPPath()); - } boolean isNonNativeTable = destinationTable.isNonNative(); isMmTable = AcidUtils.isInsertOnlyTable(destinationTable.getParameters()); - if (isNonNativeTable || isMmTable) { - queryTmpdir = destinationPath; - } else { - queryTmpdir = ctx.getTempDirForFinalJobPath(destinationPath); - } + queryTmpdir = getTmpDir(isNonNativeTable, isMmTable, false, destinationPath, dpCtx); if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("create filesink w/DEST_TABLE specifying " + queryTmpdir + " from " + destinationPath); @@ -7398,7 +7373,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { .getAuthority(), partPath.toUri().getPath()); isMmTable = AcidUtils.isInsertOnlyTable(destinationTable.getParameters()); - queryTmpdir = isMmTable ? destinationPath : ctx.getTempDirForFinalJobPath(destinationPath); + queryTmpdir = getTmpDir(false, isMmTable, false, destinationPath, null); if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("create filesink w/DEST_PARTITION specifying " + queryTmpdir + " from " + destinationPath); @@ -7514,7 +7489,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // no copy is required. we may want to revisit this policy in future try { Path qPath = FileUtils.makeQualified(destinationPath, conf); - queryTmpdir = isMmTable ? qPath : ctx.getTempDirForFinalJobPath(qPath); + queryTmpdir = getTmpDir(false, isMmTable, false, qPath, null); if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("Setting query directory " + queryTmpdir + " from " + destinationPath + " (" + isMmTable + ")"); @@ -7781,6 +7756,28 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { return output; } + private Path getTmpDir(boolean isNonNativeTable, boolean isMmTable, boolean isDirectInsert, + Path destinationPath, DynamicPartitionCtx dpCtx) { + /** + * We will directly insert to the final destination in the following cases: + * 1. Non native table + * 2. Micro-managed (insert only table) + * 3. Full ACID table and operation type is INSERT + */ + Path destPath = null; + if (isNonNativeTable || isMmTable || isDirectInsert) { + destPath = destinationPath; + } else if (HiveConf.getBoolVar(conf, ConfVars.HIVE_USE_SCRATCHDIR_FOR_STAGING)) { + destPath = ctx.getTempDirForInterimJobPath(destinationPath); + } else { + destPath = ctx.getTempDirForFinalJobPath(destinationPath); + } + if (dpCtx != null && dpCtx.getSPPath() != null) { + return new Path(destPath, dpCtx.getSPPath()); + } + return destPath; + } + private boolean useBatchingSerializer(String serdeClassName) { return SessionState.get().isHiveServerQuery() && hasSetBatchSerializer(serdeClassName);