HIVE-14640 : handle hive.merge.*files in select queries (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/eacf9f9b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/eacf9f9b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/eacf9f9b Branch: refs/heads/hive-14535 Commit: eacf9f9b6d7405b68def88ffc5fd755222375efc Parents: bd78d66 Author: Sergey Shelukhin <ser...@apache.org> Authored: Thu Oct 13 17:18:46 2016 -0700 Committer: Sergey Shelukhin <ser...@apache.org> Committed: Thu Oct 13 17:18:46 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 2 +- .../hive/ql/exec/AbstractFileMergeOperator.java | 181 +++++++++------ .../hadoop/hive/ql/exec/FileSinkOperator.java | 187 ++------------- .../apache/hadoop/hive/ql/exec/MoveTask.java | 8 +- .../hive/ql/exec/OrcFileMergeOperator.java | 11 +- .../hive/ql/exec/RCFileMergeOperator.java | 3 +- .../apache/hadoop/hive/ql/exec/Utilities.java | 225 ++++++++++++++++-- .../rcfile/truncate/ColumnTruncateMapper.java | 1 + .../apache/hadoop/hive/ql/metadata/Hive.java | 1 + .../hive/ql/optimizer/GenMapRedUtils.java | 214 +++++++++-------- .../hive/ql/parse/DDLSemanticAnalyzer.java | 9 +- .../hadoop/hive/ql/parse/GenTezUtils.java | 4 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 4 +- .../ql/plan/ConditionalResolverMergeFiles.java | 17 +- .../hadoop/hive/ql/plan/FileMergeDesc.java | 9 + .../hadoop/hive/ql/plan/FileSinkDesc.java | 14 +- .../apache/hadoop/hive/ql/plan/MoveWork.java | 10 +- ql/src/test/queries/clientpositive/mm_all.q | 57 +++-- ql/src/test/queries/clientpositive/mm_current.q | 40 +--- .../results/clientpositive/llap/mm_all.q.out | 232 +++++++++++++++---- .../clientpositive/llap/mm_current.q.out | 165 +------------ 21 files changed, 758 insertions(+), 636 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index c89142c..6201c04 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3122,7 +3122,7 @@ public class HiveConf extends Configuration { HIVE_METASTORE_MM_HEARTBEAT_TIMEOUT("hive.metastore.mm.heartbeat.timeout", "1800s", new TimeValidator(TimeUnit.SECONDS), - "MM write ID times out after this long if a heartbeat is not send. Currently disabled."), // TODO# heartbeating not implemented + "MM write ID times out after this long if a heartbeat is not send. Currently disabled."), HIVE_METASTORE_MM_ABSOLUTE_TIMEOUT("hive.metastore.mm.absolute.timeout", "7d", new TimeValidator(TimeUnit.SECONDS), http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java index 40c784b..dedbb78 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java @@ -34,6 +34,8 @@ import org.apache.hadoop.mapred.JobConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; + /** * Fast file merge operator for ORC and RCfile. This is an abstract class which * does not process any rows. Refer {@link org.apache.hadoop.hive.ql.exec.OrcFileMergeOperator} @@ -47,20 +49,21 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc> protected JobConf jc; protected FileSystem fs; - protected boolean autoDelete; - protected boolean exception; - protected Path outPath; - protected Path finalPath; - protected Path dpPath; - protected Path tmpPath; - protected Path taskTmpPath; - protected int listBucketingDepth; - protected boolean hasDynamicPartitions; - protected boolean isListBucketingAlterTableConcatenate; - protected boolean tmpPathFixedConcatenate; - protected boolean tmpPathFixed; - protected Set<Path> incompatFileSet; - protected transient DynamicPartitionCtx dpCtx; + private boolean autoDelete; + private Path outPath; // The output path used by the subclasses. + private Path finalPath; // Used as a final destination; same as outPath for MM tables. + private Path dpPath; + private Path tmpPath; // Only stored to update based on the original in fixTmpPath. + private Path taskTmpPath; // Only stored to update based on the original in fixTmpPath. + private int listBucketingDepth; + private boolean hasDynamicPartitions; + private boolean isListBucketingAlterTableConcatenate; + private boolean tmpPathFixedConcatenate; + private boolean tmpPathFixed; + private Set<Path> incompatFileSet; + private transient DynamicPartitionCtx dpCtx; + private boolean isMmTable; + private String taskId; /** Kryo ctor. */ protected AbstractFileMergeOperator() { @@ -77,39 +80,50 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc> this.jc = new JobConf(hconf); incompatFileSet = new HashSet<Path>(); autoDelete = false; - exception = false; tmpPathFixed = false; tmpPathFixedConcatenate = false; - outPath = null; - finalPath = null; dpPath = null; - tmpPath = null; - taskTmpPath = null; dpCtx = conf.getDpCtx(); hasDynamicPartitions = conf.hasDynamicPartitions(); isListBucketingAlterTableConcatenate = conf .isListBucketingAlterTableConcatenate(); listBucketingDepth = conf.getListBucketingDepth(); Path specPath = conf.getOutputPath(); - updatePaths(Utilities.toTempPath(specPath), - Utilities.toTaskTempPath(specPath)); + isMmTable = conf.getMmWriteId() != null; + if (isMmTable) { + updatePaths(specPath, null); + } else { + updatePaths(Utilities.toTempPath(specPath), Utilities.toTaskTempPath(specPath)); + } try { fs = specPath.getFileSystem(hconf); - autoDelete = fs.deleteOnExit(outPath); + if (!isMmTable) { + // Do not delete for MM tables. We either want the file if we succeed, or we must + // delete is explicitly before proceeding if the merge fails. + autoDelete = fs.deleteOnExit(outPath); + } } catch (IOException e) { - this.exception = true; - throw new HiveException("Failed to initialize AbstractFileMergeOperator", - e); + throw new HiveException("Failed to initialize AbstractFileMergeOperator", e); } } // sets up temp and task temp path private void updatePaths(Path tp, Path ttp) { - String taskId = Utilities.getTaskId(jc); + if (taskId == null) { + taskId = Utilities.getTaskId(jc); + } tmpPath = tp; - taskTmpPath = ttp; - finalPath = new Path(tp, taskId); - outPath = new Path(ttp, Utilities.toTempPath(taskId)); + if (isMmTable) { + taskTmpPath = null; + // Make sure we don't collide with the source. + outPath = finalPath = new Path(tmpPath, taskId + ".merged"); + } else { + taskTmpPath = ttp; + finalPath = new Path(tp, taskId); + outPath = new Path(ttp, Utilities.toTempPath(taskId)); + } + Utilities.LOG14535.info("Paths for merge " + taskId + ": tmp " + tmpPath + ", task " + + taskTmpPath + ", final " + finalPath + ", out " + outPath, new Exception()); } /** @@ -142,7 +156,7 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc> protected void fixTmpPath(Path inputPath, int depthDiff) throws IOException { // don't need to update tmp paths when there is no depth difference in paths - if (depthDiff <=0) { + if (depthDiff <= 0) { return; } @@ -157,10 +171,12 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc> } Path newTmpPath = new Path(tmpPath, newPath); - Path newTaskTmpPath = new Path(taskTmpPath, newPath); if (!fs.exists(newTmpPath)) { + Utilities.LOG14535.info("Creating " + newTmpPath); fs.mkdirs(newTmpPath); } + + Path newTaskTmpPath = (taskTmpPath != null) ? new Path(taskTmpPath, newPath) : null; updatePaths(newTmpPath, newTaskTmpPath); } @@ -182,7 +198,7 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc> } protected void fixTmpPath(Path path) throws IOException { - + Utilities.LOG14535.info("Calling fixTmpPath with " + path); // Fix temp path for alter table ... concatenate if (isListBucketingAlterTableConcatenate) { if (this.tmpPathFixedConcatenate) { @@ -208,38 +224,49 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc> @Override public void closeOp(boolean abort) throws HiveException { try { - if (!abort) { - // if outPath does not exist, then it means all paths within combine split are skipped as - // they are incompatible for merge (for example: files without stripe stats). - // Those files will be added to incompatFileSet - if (fs.exists(outPath)) { - FileStatus fss = fs.getFileStatus(outPath); + if (abort) { + if (!autoDelete || isMmTable) { + fs.delete(outPath, true); + } + return; + } + // if outPath does not exist, then it means all paths within combine split are skipped as + // they are incompatible for merge (for example: files without stripe stats). + // Those files will be added to incompatFileSet + if (fs.exists(outPath)) { + FileStatus fss = fs.getFileStatus(outPath); + if (!isMmTable) { if (!fs.rename(outPath, finalPath)) { - throw new IOException( - "Unable to rename " + outPath + " to " + finalPath); + throw new IOException("Unable to rename " + outPath + " to " + finalPath); } - LOG.info("renamed path " + outPath + " to " + finalPath + " . File" + - " size is " - + fss.getLen()); + LOG.info("Renamed path " + outPath + " to " + finalPath + + "(" + fss.getLen() + " bytes)."); + } else { + assert finalPath.equals(outPath); + // There's always just one file that we have merged. + // The union/DP/etc. should already be account for in the path. + Utilities.writeMmCommitManifest(Lists.newArrayList(outPath), + tmpPath.getParent(), fs, taskId, conf.getMmWriteId(), null); + LOG.info("Merged into " + finalPath + "(" + fss.getLen() + " bytes)."); } + } - // move any incompatible files to final path - if (incompatFileSet != null && !incompatFileSet.isEmpty()) { - for (Path incompatFile : incompatFileSet) { - Path destDir = finalPath.getParent(); - try { - Utilities.renameOrMoveFiles(fs, incompatFile, destDir); - LOG.info("Moved incompatible file " + incompatFile + " to " + - destDir); - } catch (HiveException e) { - LOG.error("Unable to move " + incompatFile + " to " + destDir); - throw new IOException(e); - } - } + // move any incompatible files to final path + if (incompatFileSet != null && !incompatFileSet.isEmpty()) { + if (isMmTable) { + // We only support query-time merge for MM tables, so don't handle this. + throw new HiveException("Incompatible files should not happen in MM tables."); } - } else { - if (!autoDelete) { - fs.delete(outPath, true); + for (Path incompatFile : incompatFileSet) { + Path destDir = finalPath.getParent(); + try { + Utilities.renameOrMoveFiles(fs, incompatFile, destDir); + LOG.info("Moved incompatible file " + incompatFile + " to " + + destDir); + } catch (HiveException e) { + LOG.error("Unable to move " + incompatFile + " to " + destDir); + throw new IOException(e); + } } } } catch (IOException e) { @@ -253,16 +280,26 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc> try { Path outputDir = conf.getOutputPath(); FileSystem fs = outputDir.getFileSystem(hconf); - Path backupPath = backupOutputPath(fs, outputDir); - // TODO# merge-related move - Utilities.mvFileToFinalPath(outputDir, hconf, success, LOG, conf.getDpCtx(), - null, reporter); - if (success) { - LOG.info("jobCloseOp moved merged files to output dir: " + outputDir); - } - if (backupPath != null) { - fs.delete(backupPath, true); + Long mmWriteId = conf.getMmWriteId(); + if (mmWriteId == null) { + Path backupPath = backupOutputPath(fs, outputDir); + Utilities.mvFileToFinalPath( + outputDir, hconf, success, LOG, conf.getDpCtx(), null, reporter); + if (success) { + LOG.info("jobCloseOp moved merged files to output dir: " + outputDir); + } + if (backupPath != null) { + fs.delete(backupPath, true); + } + } else { + int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(), + lbLevels = conf.getListBucketingDepth(); + // We don't expect missing buckets from mere (actually there should be no buckets), + // so just pass null as bucketing context. Union suffix should also be accounted for. + Utilities.handleMmTableFinalPath(outputDir.getParent(), null, hconf, success, + dpLevels, lbLevels, null, mmWriteId, reporter); } + } catch (IOException e) { throw new HiveException("Failed jobCloseOp for AbstractFileMergeOperator", e); @@ -290,4 +327,12 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc> public static String getOperatorName() { return "MERGE"; } + + protected final Path getOutPath() { + return outPath; + } + + protected final void addIncompatibleFile(Path path) { + incompatFileSet.add(path); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 5902036..dda4b51 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.Utilities.MissingBucketsContext; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveKey; @@ -98,7 +99,6 @@ import com.google.common.collect.Lists; public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements Serializable { - private static final String MANIFEST_EXTENSION = ".manifest"; public static final Logger LOG = LoggerFactory.getLogger(FileSinkOperator.class); private static final boolean isInfoEnabled = LOG.isInfoEnabled(); private static final boolean isDebugEnabled = LOG.isDebugEnabled(); @@ -1128,26 +1128,9 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements fsp.commit(fs, commitPaths); } } - if (!commitPaths.isEmpty()) { - Path manifestPath = getManifestDir(specPath, childSpecPathDynLinkedPartitions); - manifestPath = new Path(manifestPath, "_tmp." + ValidWriteIds.getMmFilePrefix( - conf.getMmWriteId()) + "_" + taskId + MANIFEST_EXTENSION); - Utilities.LOG14535.info("Writing manifest to " + manifestPath + " with " + commitPaths); - try { - // Don't overwrite the manifest... should fail if we have collisions. - // We assume one FSOP per task (per specPath), so we create it in specPath. - try (FSDataOutputStream out = fs.create(manifestPath, false)) { - if (out == null) { - throw new HiveException("Failed to create manifest at " + manifestPath); - } - out.writeInt(commitPaths.size()); - for (Path path : commitPaths) { - out.writeUTF(path.toString()); - } - } - } catch (IOException e) { - throw new HiveException(e); - } + if (conf.getMmWriteId() != null) { + Utilities.writeMmCommitManifest(commitPaths, specPath, fs, taskId, conf.getMmWriteId(), + childSpecPathDynLinkedPartitions); } // Only publish stats if this operator's flag was set to gather stats if (conf.isGatherStats()) { @@ -1165,9 +1148,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements super.closeOp(abort); } - private static Path getManifestDir(Path specPath, String unionSuffix) { - return (unionSuffix == null) ? specPath : new Path(specPath, unionSuffix); - } /** * @return the name of the operator @@ -1196,9 +1176,17 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements unionSuffix = conf.getDirName().getName(); } if (!conf.isMmTable()) { - Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf, reporter); // TODO# other callers + Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf, reporter); } else { - handleMmTable(specPath, unionSuffix, hconf, success, dpCtx, lbCtx, conf, reporter); + int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(), + lbLevels = lbCtx.calculateListBucketingLevel(); + // TODO: why is it stored in both? + int numBuckets = (conf.getTable() != null) ? conf.getTable().getNumBuckets() + : (dpCtx != null ? dpCtx.getNumBuckets() : 0); + MissingBucketsContext mbc = new MissingBucketsContext( + conf.getTableInfo(), numBuckets, conf.getCompressed()); + Utilities.handleMmTableFinalPath(specPath, unionSuffix, hconf, success, + dpLevels, lbLevels, mbc, conf.getMmWriteId(), reporter); } } } catch (IOException e) { @@ -1207,152 +1195,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements super.jobCloseOp(hconf, success); } - private static FileStatus[] getMmDirectoryCandidates(FileSystem fs, Path path, - DynamicPartitionCtx dpCtx, ListBucketingCtx lbCtx, String unionSuffix, PathFilter filter) - throws IOException { - StringBuilder sb = new StringBuilder(path.toUri().getPath()); - int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(), - lbLevels = lbCtx == null ? 0 : lbCtx.getSkewedColNames().size(); - for (int i = 0; i < dpLevels + lbLevels; i++) { - sb.append(Path.SEPARATOR).append("*"); - } - if (unionSuffix != null) { - sb.append(Path.SEPARATOR).append(unionSuffix); - } - sb.append(Path.SEPARATOR).append("*"); // TODO: we could add exact mm prefix here - Utilities.LOG14535.info("Looking for files via: " + sb.toString()); - Path pathPattern = new Path(path, sb.toString()); - return fs.globStatus(pathPattern, filter); - } - - private void handleMmTable(Path specPath, String unionSuffix, Configuration hconf, - boolean success, DynamicPartitionCtx dpCtx, ListBucketingCtx lbCtx, FileSinkDesc conf, - Reporter reporter) throws IOException, HiveException { - FileSystem fs = specPath.getFileSystem(hconf); - // Manifests would be at the root level, but the results at target level. - // TODO# special case - doesn't take bucketing into account - Path manifestDir = getManifestDir(specPath, unionSuffix); - - ValidWriteIds.IdPathFilter filter = new ValidWriteIds.IdPathFilter(conf.getMmWriteId(), true); - if (!success) { - tryDeleteAllMmFiles(fs, specPath, manifestDir, dpCtx, lbCtx, unionSuffix, filter); - return; - } - FileStatus[] files = HiveStatsUtils.getFileStatusRecurse(manifestDir, 1, fs, filter); - Utilities.LOG14535.info("Looking for manifests in: " + manifestDir); - List<Path> manifests = new ArrayList<>(); - if (files != null) { - for (FileStatus status : files) { - Path path = status.getPath(); - if (path.getName().endsWith(MANIFEST_EXTENSION)) { - Utilities.LOG14535.info("Reading manifest " + path); - manifests.add(path); - } - } - } - - Utilities.LOG14535.info("Looking for files in: " + specPath); - files = getMmDirectoryCandidates(fs, specPath, dpCtx, lbCtx, unionSuffix, filter); - ArrayList<FileStatus> results = new ArrayList<>(); - if (files != null) { - for (FileStatus status : files) { - Path path = status.getPath(); - Utilities.LOG14535.info("Looking at path: " + path + " from " + System.identityHashCode(this)); - if (!status.isDirectory()) { - if (!path.getName().endsWith(MANIFEST_EXTENSION)) { - Utilities.LOG14535.warn("Unknown file found, deleting: " + path); - tryDelete(fs, path); - } - } else { - results.add(status); - } - } - } - - HashSet<String> committed = new HashSet<>(); - for (Path mfp : manifests) { - try (FSDataInputStream mdis = fs.open(mfp)) { - int fileCount = mdis.readInt(); - for (int i = 0; i < fileCount; ++i) { - String nextFile = mdis.readUTF(); - if (!committed.add(nextFile)) { - throw new HiveException(nextFile + " was specified in multiple manifests"); - } - } - } - } - - for (FileStatus status : results) { - for (FileStatus child : fs.listStatus(status.getPath())) { - Path childPath = child.getPath(); - if (committed.remove(childPath.toString())) continue; // A good file. - Utilities.LOG14535.info("Deleting " + childPath + " that was not committed"); - // We should actually succeed here - if we fail, don't commit the query. - if (!fs.delete(childPath, true)) { - throw new HiveException("Failed to delete an uncommitted path " + childPath); - } - } - } - - if (!committed.isEmpty()) { - throw new HiveException("The following files were committed but not found: " + committed); - } - for (Path mfp : manifests) { - Utilities.LOG14535.info("Deleting manifest " + mfp); - tryDelete(fs, mfp); - } - // Delete the manifest directory if we only created it for manifests; otherwise the - // dynamic partition loader will find it and try to load it as a partition... what a mess. - if (manifestDir != specPath) { - FileStatus[] remainingFiles = fs.listStatus(manifestDir); - if (remainingFiles == null || remainingFiles.length == 0) { - Utilities.LOG14535.info("Deleting directory " + manifestDir); - tryDelete(fs, manifestDir); - } - } - - if (results.isEmpty()) return; - - // TODO: see HIVE-14886 - removeTempOrDuplicateFiles is broken for list bucketing, - // so maintain parity here by not calling it at all. - if (lbCtx != null) return; - FileStatus[] finalResults = results.toArray(new FileStatus[results.size()]); - List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles( - fs, finalResults, dpCtx, conf, hconf); - // create empty buckets if necessary - if (emptyBuckets.size() > 0) { - Utilities.createEmptyBuckets(hconf, emptyBuckets, conf, reporter); - } - } - - private void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manifestDir, - DynamicPartitionCtx dpCtx, ListBucketingCtx lbCtx, String unionSuffix, - ValidWriteIds.IdPathFilter filter) throws IOException { - FileStatus[] files = getMmDirectoryCandidates(fs, specPath, dpCtx, lbCtx, unionSuffix, filter); - if (files != null) { - for (FileStatus status : files) { - Utilities.LOG14535.info("Deleting " + status.getPath() + " on failure"); - tryDelete(fs, status.getPath()); - } - } - files = HiveStatsUtils.getFileStatusRecurse(manifestDir, 1, fs, filter); - if (files != null) { - for (FileStatus status : files) { - Utilities.LOG14535.info("Deleting " + status.getPath() + " on failure"); - tryDelete(fs, status.getPath()); - } - } - } - - - private void tryDelete(FileSystem fs, Path path) { - try { - fs.delete(path, true); - } catch (IOException ex) { - LOG.error("Failed to delete " + path, ex); - } - } - @Override public OperatorType getType() { return OperatorType.FILESINK; @@ -1427,7 +1269,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements for (Map.Entry<String, FSPaths> entry : valToPaths.entrySet()) { String fspKey = entry.getKey(); // DP/LB FSPaths fspValue = entry.getValue(); - // TODO# useful code as reference, as it takes apart the crazy paths // for bucketed tables, hive.optimize.sort.dynamic.partition optimization // adds the taskId to the fspKey. if (conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) { http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 9bc4836..f2b8ca3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -256,6 +256,9 @@ public class MoveTask extends Task<MoveWork> implements Serializable { @Override public int execute(DriverContext driverContext) { + Utilities.LOG14535.info("Executing MoveWork " + System.identityHashCode(work) + + " with " + work.getLoadFileWork() + "; " + work.getLoadTableWork() + "; " + + work.getLoadMultiFilesWork(), new Exception()); try { if (driverContext.getCtx().getExplainAnalyze() == AnalyzeState.RUNNING) { @@ -315,15 +318,14 @@ public class MoveTask extends Task<MoveWork> implements Serializable { boolean isAcid = work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID; if (tbd.isMmTable() && isAcid) { - // TODO# need to make sure ACID writes to final directories. Otherwise, might need to move. - throw new HiveException("ACID and MM are not supported"); + throw new HiveException("ACID and MM are not supported"); } // Create a data container DataContainer dc = null; if (tbd.getPartitionSpec().size() == 0) { dc = new DataContainer(table.getTTable()); - Utilities.LOG14535.info("loadTable called from " + tbd.getSourcePath() + " into " + tbd.getTable()); + Utilities.LOG14535.info("loadTable called from " + tbd.getSourcePath() + " into " + tbd.getTable().getTableName(), new Exception()); db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getReplace(), work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isAcid, hasFollowingStatsTask(), tbd.getMmWriteId()); http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java index e3cb765..835791b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java @@ -75,6 +75,7 @@ public class OrcFileMergeOperator extends private void processKeyValuePairs(Object key, Object value) throws HiveException { String filePath = ""; + boolean exception = false; try { OrcFileValueWrapper v; OrcFileKeyWrapper k; @@ -87,12 +88,15 @@ public class OrcFileMergeOperator extends // skip incompatible file, files that are missing stripe statistics are set to incompatible if (k.isIncompatFile()) { LOG.warn("Incompatible ORC file merge! Stripe statistics is missing. " + k.getInputPath()); - incompatFileSet.add(k.getInputPath()); + addIncompatibleFile(k.getInputPath()); return; } filePath = k.getInputPath().toUri().getPath(); + Utilities.LOG14535.info("OrcFileMergeOperator processing " + filePath, new Exception()); + + fixTmpPath(k.getInputPath().getParent()); v = (OrcFileValueWrapper) value; @@ -126,6 +130,7 @@ public class OrcFileMergeOperator extends options.bufferSize(compressBuffSize).enforceBufferSize(); } + Path outPath = getOutPath(); outWriter = OrcFile.createWriter(outPath, options); if (isLogDebugEnabled) { LOG.info("ORC merge file output path: " + outPath); @@ -133,7 +138,7 @@ public class OrcFileMergeOperator extends } if (!checkCompatibility(k)) { - incompatFileSet.add(k.getInputPath()); + addIncompatibleFile(k.getInputPath()); return; } @@ -164,7 +169,7 @@ public class OrcFileMergeOperator extends outWriter.appendUserMetadata(v.getUserMetadata()); } } catch (Throwable e) { - this.exception = true; + exception = true; LOG.error("Closing operator..Exception: " + ExceptionUtils.getStackTrace(e)); throw new HiveException(e); } finally { http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java index 4dea1d2..349b459 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java @@ -77,7 +77,7 @@ public class RCFileMergeOperator codec = key.getCodec(); columnNumber = key.getKeyBuffer().getColumnNumber(); RCFileOutputFormat.setColumnNumber(jc, columnNumber); - outWriter = new RCFile.Writer(fs, jc, outPath, null, codec); + outWriter = new RCFile.Writer(fs, jc, getOutPath(), null, codec); } boolean sameCodec = ((codec == key.getCodec()) || codec.getClass().equals( @@ -94,7 +94,6 @@ public class RCFileMergeOperator key.getRecordLength(), key.getKeyLength(), key.getCompressedKeyLength()); } catch (Throwable e) { - this.exception = true; closeOp(true); throw new HiveException(e); } http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index d343e32..49bdd84 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -83,6 +83,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -94,6 +96,7 @@ import org.apache.hadoop.hive.common.HiveInterruptUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.common.ValidWriteIds; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; @@ -146,6 +149,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.ListBucketingCtx; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.MergeJoinWork; @@ -1411,7 +1415,7 @@ public final class Utilities { boolean success, Logger log, DynamicPartitionCtx dpCtx, FileSinkDesc conf, Reporter reporter) throws IOException, HiveException { - + FileSystem fs = specPath.getFileSystem(hconf); Path tmpPath = Utilities.toTempPath(specPath); Path taskTmpPath = Utilities.toTaskTempPath(specPath); @@ -1422,12 +1426,14 @@ public final class Utilities { PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin("FileSinkOperator", "RemoveTempOrDuplicateFiles"); // remove any tmp file or double-committed output files - List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(fs, statuses, dpCtx, conf, hconf); + List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles( + fs, statuses, dpCtx, conf, hconf); perfLogger.PerfLogEnd("FileSinkOperator", "RemoveTempOrDuplicateFiles"); // create empty buckets if necessary if (emptyBuckets.size() > 0) { perfLogger.PerfLogBegin("FileSinkOperator", "CreateEmptyBuckets"); - createEmptyBuckets(hconf, emptyBuckets, conf, reporter); + createEmptyBuckets( + hconf, emptyBuckets, conf.getCompressed(), conf.getTableInfo(), reporter); perfLogger.PerfLogEnd("FileSinkOperator", "CreateEmptyBuckets"); } // move to the file destination @@ -1457,7 +1463,7 @@ public final class Utilities { * @throws IOException */ static void createEmptyBuckets(Configuration hconf, List<Path> paths, - FileSinkDesc conf, Reporter reporter) + boolean isCompressed, TableDesc tableInfo, Reporter reporter) throws HiveException, IOException { JobConf jc; @@ -1469,13 +1475,11 @@ public final class Utilities { } HiveOutputFormat<?, ?> hiveOutputFormat = null; Class<? extends Writable> outputClass = null; - boolean isCompressed = conf.getCompressed(); - TableDesc tableInfo = conf.getTableInfo(); try { Serializer serializer = (Serializer) tableInfo.getDeserializerClass().newInstance(); serializer.initialize(null, tableInfo.getProperties()); outputClass = serializer.getSerializedClass(); - hiveOutputFormat = HiveFileFormatUtils.getHiveOutputFormat(hconf, conf.getTableInfo()); + hiveOutputFormat = HiveFileFormatUtils.getHiveOutputFormat(hconf, tableInfo); } catch (SerDeException e) { throw new HiveException(e); } catch (InstantiationException e) { @@ -1518,13 +1522,21 @@ public final class Utilities { */ public static List<Path> removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf) throws IOException { + int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(), + numBuckets = (conf != null && conf.getTable() != null) + ? conf.getTable().getNumBuckets() : 0; + return removeTempOrDuplicateFiles(fs, fileStats, dpLevels, numBuckets, hconf); + } + + public static List<Path> removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, + int dpLevels, int numBuckets, Configuration hconf) throws IOException { if (fileStats == null) { return null; } List<Path> result = new ArrayList<Path>(); HashMap<String, FileStatus> taskIDToFile = null; - if (dpCtx != null) { + if (dpLevels > 0) { FileStatus parts[] = fileStats; for (int i = 0; i < parts.length; ++i) { @@ -1543,14 +1555,14 @@ public final class Utilities { taskIDToFile = removeTempOrDuplicateFiles(items, fs); // if the table is bucketed and enforce bucketing, we should check and generate all buckets - if (dpCtx.getNumBuckets() > 0 && taskIDToFile != null && !"tez".equalsIgnoreCase(hconf.get(ConfVars.HIVE_EXECUTION_ENGINE.varname))) { + if (numBuckets > 0 && taskIDToFile != null && !"tez".equalsIgnoreCase(hconf.get(ConfVars.HIVE_EXECUTION_ENGINE.varname))) { // refresh the file list items = fs.listStatus(parts[i].getPath()); // get the missing buckets and generate empty buckets String taskID1 = taskIDToFile.keySet().iterator().next(); Path bucketPath = taskIDToFile.values().iterator().next().getPath(); Utilities.LOG14535.info("Bucket path " + bucketPath); - for (int j = 0; j < dpCtx.getNumBuckets(); ++j) { + for (int j = 0; j < numBuckets; ++j) { addBucketFileIfMissing(result, taskIDToFile, taskID1, bucketPath, j); } } @@ -1561,13 +1573,13 @@ public final class Utilities { return result; } taskIDToFile = removeTempOrDuplicateFiles(items, fs); - if(taskIDToFile != null && taskIDToFile.size() > 0 && conf != null && conf.getTable() != null - && (conf.getTable().getNumBuckets() > taskIDToFile.size()) && !"tez".equalsIgnoreCase(hconf.get(ConfVars.HIVE_EXECUTION_ENGINE.varname))) { + if(taskIDToFile != null && taskIDToFile.size() > 0 && (numBuckets > taskIDToFile.size()) + && !"tez".equalsIgnoreCase(hconf.get(ConfVars.HIVE_EXECUTION_ENGINE.varname))) { // get the missing buckets and generate empty buckets for non-dynamic partition String taskID1 = taskIDToFile.keySet().iterator().next(); Path bucketPath = taskIDToFile.values().iterator().next().getPath(); Utilities.LOG14535.info("Bucket path " + bucketPath); - for (int j = 0; j < conf.getTable().getNumBuckets(); ++j) { + for (int j = 0; j < numBuckets; ++j) { addBucketFileIfMissing(result, taskIDToFile, taskID1, bucketPath, j); } } @@ -3746,4 +3758,191 @@ public final class Utilities { String suffix = "KMGTPE".charAt(exp-1) + ""; return String.format("%.2f%sB", bytes / Math.pow(unit, exp), suffix); } + + private static final String MANIFEST_EXTENSION = ".manifest"; + + private static Path getManifestDir(Path specPath, String unionSuffix) { + return (unionSuffix == null) ? specPath : new Path(specPath, unionSuffix); + } + + private static void tryDelete(FileSystem fs, Path path) { + try { + fs.delete(path, true); + } catch (IOException ex) { + LOG.error("Failed to delete " + path, ex); + } + } + + private static FileStatus[] getMmDirectoryCandidates(FileSystem fs, Path path, + int dpLevels, int lbLevels, String unionSuffix, PathFilter filter) throws IOException { + StringBuilder sb = new StringBuilder(path.toUri().getPath()); + for (int i = 0; i < dpLevels + lbLevels; i++) { + sb.append(Path.SEPARATOR).append("*"); + } + if (unionSuffix != null) { + sb.append(Path.SEPARATOR).append(unionSuffix); + } + sb.append(Path.SEPARATOR).append("*"); // TODO: we could add exact mm prefix here + Utilities.LOG14535.info("Looking for files via: " + sb.toString()); + Path pathPattern = new Path(path, sb.toString()); + return fs.globStatus(pathPattern, filter); + } + + private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manifestDir, + int dpLevels, int lbLevels, String unionSuffix, ValidWriteIds.IdPathFilter filter) + throws IOException { + FileStatus[] files = getMmDirectoryCandidates( + fs, specPath, dpLevels, lbLevels, unionSuffix, filter); + if (files != null) { + for (FileStatus status : files) { + Utilities.LOG14535.info("Deleting " + status.getPath() + " on failure"); + tryDelete(fs, status.getPath()); + } + } + files = HiveStatsUtils.getFileStatusRecurse(manifestDir, 1, fs, filter); + if (files != null) { + for (FileStatus status : files) { + Utilities.LOG14535.info("Deleting " + status.getPath() + " on failure"); + tryDelete(fs, status.getPath()); + } + } + } + + + public static void writeMmCommitManifest(List<Path> commitPaths, Path specPath, FileSystem fs, + String taskId, Long mmWriteId, String unionSuffix) throws HiveException { + if (commitPaths.isEmpty()) return; + Path manifestPath = getManifestDir(specPath, unionSuffix); + manifestPath = new Path(manifestPath, "_tmp." + ValidWriteIds.getMmFilePrefix( + mmWriteId) + "_" + taskId + MANIFEST_EXTENSION); + Utilities.LOG14535.info("Writing manifest to " + manifestPath + " with " + commitPaths); + try { + // Don't overwrite the manifest... should fail if we have collisions. + // We assume one FSOP per task (per specPath), so we create it in specPath. + try (FSDataOutputStream out = fs.create(manifestPath, false)) { + if (out == null) { + throw new HiveException("Failed to create manifest at " + manifestPath); + } + out.writeInt(commitPaths.size()); + for (Path path : commitPaths) { + out.writeUTF(path.toString()); + } + } + } catch (IOException e) { + throw new HiveException(e); + } + } + + public static final class MissingBucketsContext { + public final TableDesc tableInfo; + public final int numBuckets; + public final boolean isCompressed; + public MissingBucketsContext(TableDesc tableInfo, int numBuckets, boolean isCompressed) { + this.tableInfo = tableInfo; + this.numBuckets = numBuckets; + this.isCompressed = isCompressed; + } + } + + public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Configuration hconf, + boolean success, int dpLevels, int lbLevels, MissingBucketsContext mbc, long mmWriteId, + Reporter reporter) throws IOException, HiveException { + FileSystem fs = specPath.getFileSystem(hconf); + // Manifests would be at the root level, but the results at target level. + // TODO# special case - doesn't take bucketing into account + Path manifestDir = getManifestDir(specPath, unionSuffix); + + ValidWriteIds.IdPathFilter filter = new ValidWriteIds.IdPathFilter(mmWriteId, true); + if (!success) { + tryDeleteAllMmFiles(fs, specPath, manifestDir, dpLevels, lbLevels, unionSuffix, filter); + return; + } + FileStatus[] files = HiveStatsUtils.getFileStatusRecurse(manifestDir, 1, fs, filter); + Utilities.LOG14535.info("Looking for manifests in: " + manifestDir + " (" + mmWriteId + ")"); + List<Path> manifests = new ArrayList<>(); + if (files != null) { + for (FileStatus status : files) { + Path path = status.getPath(); + if (path.getName().endsWith(MANIFEST_EXTENSION)) { + Utilities.LOG14535.info("Reading manifest " + path); + manifests.add(path); + } + } + } + + Utilities.LOG14535.info("Looking for files in: " + specPath); + files = getMmDirectoryCandidates(fs, specPath, dpLevels, lbLevels, unionSuffix, filter); + ArrayList<FileStatus> results = new ArrayList<>(); + if (files != null) { + for (FileStatus status : files) { + Path path = status.getPath(); + Utilities.LOG14535.info("Looking at path: " + path); + if (!status.isDirectory()) { + if (!path.getName().endsWith(MANIFEST_EXTENSION)) { + Utilities.LOG14535.warn("Unknown file found, deleting: " + path); + tryDelete(fs, path); + } + } else { + results.add(status); + } + } + } + + HashSet<String> committed = new HashSet<>(); + for (Path mfp : manifests) { + try (FSDataInputStream mdis = fs.open(mfp)) { + int fileCount = mdis.readInt(); + for (int i = 0; i < fileCount; ++i) { + String nextFile = mdis.readUTF(); + if (!committed.add(nextFile)) { + throw new HiveException(nextFile + " was specified in multiple manifests"); + } + } + } + } + + for (FileStatus status : results) { + for (FileStatus child : fs.listStatus(status.getPath())) { + Path childPath = child.getPath(); + if (committed.remove(childPath.toString())) continue; // A good file. + Utilities.LOG14535.info("Deleting " + childPath + " that was not committed"); + // We should actually succeed here - if we fail, don't commit the query. + if (!fs.delete(childPath, true)) { + throw new HiveException("Failed to delete an uncommitted path " + childPath); + } + } + } + + if (!committed.isEmpty()) { + throw new HiveException("The following files were committed but not found: " + committed); + } + for (Path mfp : manifests) { + Utilities.LOG14535.info("Deleting manifest " + mfp); + tryDelete(fs, mfp); + } + // Delete the manifest directory if we only created it for manifests; otherwise the + // dynamic partition loader will find it and try to load it as a partition... what a mess. + if (manifestDir != specPath) { + FileStatus[] remainingFiles = fs.listStatus(manifestDir); + if (remainingFiles == null || remainingFiles.length == 0) { + Utilities.LOG14535.info("Deleting directory " + manifestDir); + tryDelete(fs, manifestDir); + } + } + + if (results.isEmpty()) return; + + // TODO: see HIVE-14886 - removeTempOrDuplicateFiles is broken for list bucketing, + // so maintain parity here by not calling it at all. + if (lbLevels != 0) return; + FileStatus[] finalResults = results.toArray(new FileStatus[results.size()]); + List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles( + fs, finalResults, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf); + // create empty buckets if necessary + if (emptyBuckets.size() > 0) { + assert mbc != null; + Utilities.createEmptyBuckets(hconf, emptyBuckets, mbc.isCompressed, mbc.tableInfo, reporter); + } + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java index bd537cd..d013c6f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java @@ -234,6 +234,7 @@ public class ColumnTruncateMapper extends MapReduceBase implements ) throws HiveException, IOException { FileSystem fs = outputPath.getFileSystem(job); Path backupPath = backupOutputPath(fs, outputPath, job); + // TODO# special case - what is this about? Utilities.mvFileToFinalPath(outputPath, job, success, LOG, dynPartCtx, null, reporter); fs.delete(backupPath, true); http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index e66948f..9a1c1fa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1842,6 +1842,7 @@ private void constructOneLBLocationMap(FileStatus fSta, if (!s.isDirectory()) { throw new HiveException("partition " + s.getPath() + " is not a directory!"); } + Utilities.LOG14535.info("Found DP " + s.getPath()); validPartitions.add(s.getPath()); } } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 675bfd0..79ef4d0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -93,6 +93,7 @@ import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.FileMergeDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc.SampleDesc; +import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.LoadFileDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; @@ -1256,23 +1257,28 @@ public final class GenMapRedUtils { List<Task<MoveWork>> mvTasks, HiveConf conf, Task<? extends Serializable> currTask) throws SemanticException { + // // 1. create the operator tree // FileSinkDesc fsInputDesc = fsInput.getConf(); + Utilities.LOG14535.info("Creating merge work from " + System.identityHashCode(fsInput) + + " with write ID " + (fsInputDesc.isMmTable() ? fsInputDesc.getMmWriteId() : null) + " into " + finalName); // Create a TableScan operator RowSchema inputRS = fsInput.getSchema(); TableScanOperator tsMerge = GenMapRedUtils.createTemporaryTableScanOperator( fsInput.getCompilationOpContext(), inputRS); + Long srcMmWriteId = fsInputDesc.isMmTable() ? fsInputDesc.getMmWriteId() : null; + // Create a FileSink operator TableDesc ts = (TableDesc) fsInputDesc.getTableInfo().clone(); - // TODO# special case #N - merge FS is created here - FileSinkDesc fsOutputDesc = new FileSinkDesc(finalName, ts, - conf.getBoolVar(ConfVars.COMPRESSRESULT)); - FileSinkOperator fsOutput = (FileSinkOperator) OperatorFactory.getAndMakeChild( - fsOutputDesc, inputRS, tsMerge); + FileSinkDesc fsOutputDesc = new FileSinkDesc( + finalName, ts, conf.getBoolVar(ConfVars.COMPRESSRESULT)); + fsOutputDesc.setMmWriteId(srcMmWriteId); + // Create and attach the filesink for the merge. We don't actually need it for anything here. + OperatorFactory.getAndMakeChild(fsOutputDesc, inputRS, tsMerge); // If the input FileSinkOperator is a dynamic partition enabled, the tsMerge input schema // needs to include the partition column, and the fsOutput should have @@ -1305,9 +1311,7 @@ public final class GenMapRedUtils { // // 2. Constructing a conditional task consisting of a move task and a map reduce task // - // TODO# movetask is created here; handle MM tables - MoveWork dummyMv = new MoveWork(null, null, null, - new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false); + Path inputDirName = fsInputDesc.getMergeInputDirName(); MapWork cplan; Serializable work; @@ -1348,8 +1352,15 @@ public final class GenMapRedUtils { cplan.setInputformat("org.apache.hadoop.hive.ql.io.CombineHiveInputFormat"); // NOTE: we should gather stats in MR1 rather than MR2 at merge job since we don't // know if merge MR2 will be triggered at execution time + MoveWork dummyMv = null; + if (srcMmWriteId == null) { + // Only create the movework for non-MM table. No action needed for a MM table. + Utilities.LOG14535.info("creating dummy movetask for merge (with lfd)"); + dummyMv = new MoveWork(null, null, null, + new LoadFileDesc(inputDirName, finalName, true, null, null), false); + } ConditionalTask cndTsk = GenMapRedUtils.createCondTask(conf, currTask, dummyMv, work, - fsInputDesc.getFinalDirName().toString()); + fsInputDesc.getMergeInputDirName().toString()); // keep the dynamic partition context in conditional task resolver context ConditionalResolverMergeFilesCtx mrCtx = @@ -1360,7 +1371,13 @@ public final class GenMapRedUtils { // // 3. add the moveTask as the children of the conditional task // - linkMoveTask(fsOutput, cndTsk, mvTasks, conf, dependencyTask); + // Use the original fsOp path here in case of MM - while the new FSOP merges files inside the + // MM directory, the original MoveTask still commits based on the parent. Note that this path + // can only be triggered for a merge that's part of insert for now; MM tables do not support + // concatenate. Keeping the old logic for non-MM tables with temp directories and stuff. + Path fsopPath = srcMmWriteId != null + ? fsInputDesc.getFinalDirName() : fsOutputDesc.getFinalDirName(); + linkMoveTask(fsopPath, cndTsk, mvTasks, conf, dependencyTask); } /** @@ -1373,11 +1390,11 @@ public final class GenMapRedUtils { * @param hconf * @param dependencyTask */ - public static void linkMoveTask(FileSinkOperator newOutput, + private static void linkMoveTask(Path fsopPath, ConditionalTask cndTsk, List<Task<MoveWork>> mvTasks, HiveConf hconf, DependencyCollectionTask dependencyTask) { - Task<MoveWork> mvTask = GenMapRedUtils.findMoveTask(mvTasks, newOutput); + Task<MoveWork> mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(mvTasks, fsopPath); for (Task<? extends Serializable> tsk : cndTsk.getListTasks()) { linkMoveTask(mvTask, tsk, hconf, dependencyTask); @@ -1392,7 +1409,7 @@ public final class GenMapRedUtils { * @param hconf * @param dependencyTask */ - public static void linkMoveTask(Task<MoveWork> mvTask, + private static void linkMoveTask(Task<MoveWork> mvTask, Task<? extends Serializable> task, HiveConf hconf, DependencyCollectionTask dependencyTask) { @@ -1527,10 +1544,11 @@ public final class GenMapRedUtils { TableScanOperator topOp, FileSinkDesc fsDesc) { ArrayList<String> aliases = new ArrayList<String>(); - Path inputDir = fsDesc.getFinalDirName(); + Path inputDir = fsDesc.getMergeInputDirName(); TableDesc tblDesc = fsDesc.getTableInfo(); aliases.add(inputDir.toString()); // dummy alias: just use the input path + Utilities.LOG14535.info("createMRWorkForMergingFiles for " + inputDir); // constructing the default MapredWork MapredWork cMrPlan = GenMapRedUtils.getMapRedWorkFromConf(conf); MapWork cplan = cMrPlan.getMapWork(); @@ -1555,8 +1573,9 @@ public final class GenMapRedUtils { */ public static MapWork createMergeTask(FileSinkDesc fsInputDesc, Path finalName, boolean hasDynamicPartitions, CompilationOpContext ctx) throws SemanticException { + + Path inputDir = fsInputDesc.getMergeInputDirName(); - Path inputDir = fsInputDesc.getFinalDirName(); TableDesc tblDesc = fsInputDesc.getTableInfo(); List<Path> inputDirs = new ArrayList<Path>(1); @@ -1580,6 +1599,7 @@ public final class GenMapRedUtils { + " format other than RCFile or ORCFile"); } + Utilities.LOG14535.info("creating mergefilework from " + inputDirs + " to " + finalName); // create the merge file work MergeFileWork work = new MergeFileWork(inputDirs, finalName, hasDynamicPartitions, tblDesc.getInputFileFormatClass().getName()); @@ -1602,6 +1622,7 @@ public final class GenMapRedUtils { } else { fmd = new OrcFileMergeDesc(); } + fmd.setMmWriteId(fsInputDesc.getMmWriteId()); fmd.setDpCtx(fsInputDesc.getDynPartCtx()); fmd.setOutputPath(finalName); fmd.setHasDynamicPartitions(work.hasDynamicPartitions()); @@ -1635,6 +1656,7 @@ public final class GenMapRedUtils { public static ConditionalTask createCondTask(HiveConf conf, Task<? extends Serializable> currTask, MoveWork mvWork, Serializable mergeWork, String inputPath) { + Utilities.LOG14535.info("Creating conditional merge task for " + inputPath); // There are 3 options for this ConditionalTask: // 1) Merge the partitions @@ -1642,10 +1664,14 @@ public final class GenMapRedUtils { // 3) Merge some partitions and move other partitions (i.e. merge some partitions and don't // merge others) in this case the merge is done first followed by the move to prevent // conflicts. + // TODO: if we are not dealing with concatenate DDL, we should not create a merge+move path + // because it should be impossible to get incompatible outputs. + // Create a dummy task if no move is needed. + Serializable moveWork = mvWork != null ? mvWork : new DependencyCollectionWork(); Task<? extends Serializable> mergeOnlyMergeTask = TaskFactory.get(mergeWork, conf); - Task<? extends Serializable> moveOnlyMoveTask = TaskFactory.get(mvWork, conf); + Task<? extends Serializable> moveOnlyMoveTask = TaskFactory.get(moveWork, conf); Task<? extends Serializable> mergeAndMoveMergeTask = TaskFactory.get(mergeWork, conf); - Task<? extends Serializable> mergeAndMoveMoveTask = TaskFactory.get(mvWork, conf); + Task<? extends Serializable> mergeAndMoveMoveTask = TaskFactory.get(moveWork, conf); // NOTE! It is necessary merge task is the parent of the move task, and not // the other way around, for the proper execution of the execute method of @@ -1653,7 +1679,7 @@ public final class GenMapRedUtils { mergeAndMoveMergeTask.addDependentTask(mergeAndMoveMoveTask); List<Serializable> listWorks = new ArrayList<Serializable>(); - listWorks.add(mvWork); + listWorks.add(moveWork); listWorks.add(mergeWork); ConditionalWork cndWork = new ConditionalWork(listWorks); @@ -1689,8 +1715,8 @@ public final class GenMapRedUtils { .isSkewedStoredAsDir(); } - public static Task<MoveWork> findMoveTask( - List<Task<MoveWork>> mvTasks, FileSinkOperator fsOp) { + public static Task<MoveWork> findMoveTaskForFsopOutput( + List<Task<MoveWork>> mvTasks, Path fsopFinalDir) { // find the move task for (Task<MoveWork> mvTsk : mvTasks) { MoveWork mvWork = mvTsk.getWork(); @@ -1700,9 +1726,10 @@ public final class GenMapRedUtils { } else if (mvWork.getLoadTableWork() != null) { srcDir = mvWork.getLoadTableWork().getSourcePath(); } + Utilities.LOG14535.info("Observing MoveWork " + System.identityHashCode(mvWork) + + " with " + srcDir + " while looking for " + fsopFinalDir); - if ((srcDir != null) - && (srcDir.equals(fsOp.getConf().getFinalDirName()))) { + if ((srcDir != null) && srcDir.equals(fsopFinalDir)) { return mvTsk; } } @@ -1722,59 +1749,58 @@ public final class GenMapRedUtils { Task<? extends Serializable> currTask, boolean isInsertTable) { // Has the user enabled merging of files for map-only jobs or for all jobs - if ((mvTasks != null) && (!mvTasks.isEmpty())) { - - // no need of merging if the move is to a local file system - MoveTask mvTask = (MoveTask) GenMapRedUtils.findMoveTask(mvTasks, fsOp); - - if (mvTask != null && isInsertTable && hconf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER) - && !fsOp.getConf().isMaterialization()) { - // mark the MapredWork and FileSinkOperator for gathering stats - fsOp.getConf().setGatherStats(true); - fsOp.getConf().setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE)); - if (!mvTask.hasFollowingStatsTask()) { - GenMapRedUtils.addStatsTask(fsOp, mvTask, currTask, hconf); - } + if (mvTasks == null || mvTasks.isEmpty()) return false; + + // no need of merging if the move is to a local file system + // We are looking based on the original FSOP, so use the original path as is. + MoveTask mvTask = (MoveTask) GenMapRedUtils.findMoveTaskForFsopOutput( + mvTasks, fsOp.getConf().getFinalDirName()); + + // TODO: wtf? wtf?!! why is this in this method? + if (mvTask != null && isInsertTable && hconf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER) + && !fsOp.getConf().isMaterialization()) { + // mark the MapredWork and FileSinkOperator for gathering stats + fsOp.getConf().setGatherStats(true); + fsOp.getConf().setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE)); + if (!mvTask.hasFollowingStatsTask()) { + GenMapRedUtils.addStatsTask(fsOp, mvTask, currTask, hconf); } + } - if ((mvTask != null) && !mvTask.isLocal() && fsOp.getConf().canBeMerged()) { + if (mvTask == null || mvTask.isLocal() || !fsOp.getConf().canBeMerged()) return false; - if (currTask.getWork() instanceof TezWork) { - // tez blurs the boundary between map and reduce, thus it has it's own - // config - return hconf.getBoolVar(ConfVars.HIVEMERGETEZFILES); - } else if (currTask.getWork() instanceof SparkWork) { - // spark has its own config for merging - return hconf.getBoolVar(ConfVars.HIVEMERGESPARKFILES); - } + if (currTask.getWork() instanceof TezWork) { + // tez blurs the boundary between map and reduce, thus it has it's own config + return hconf.getBoolVar(ConfVars.HIVEMERGETEZFILES); + } else if (currTask.getWork() instanceof SparkWork) { + // spark has its own config for merging + return hconf.getBoolVar(ConfVars.HIVEMERGESPARKFILES); + } + return isMergeRequiredForMr(hconf, fsOp, currTask); + } - if (fsOp.getConf().isLinkedFileSink()) { - // If the user has HIVEMERGEMAPREDFILES set to false, the idea was the - // number of reducers are few, so the number of files anyway are small. - // However, with this optimization, we are increasing the number of files - // possibly by a big margin. So, merge aggresively. - if (hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) || - hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES)) { - return true; - } - } else { - // There are separate configuration parameters to control whether to - // merge for a map-only job - // or for a map-reduce job - if (currTask.getWork() instanceof MapredWork) { - ReduceWork reduceWork = ((MapredWork) currTask.getWork()).getReduceWork(); - boolean mergeMapOnly = - hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && reduceWork == null; - boolean mergeMapRed = - hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES) && - reduceWork != null; - if (mergeMapOnly || mergeMapRed) { - return true; - } - } else { - return false; - } - } + private static boolean isMergeRequiredForMr(HiveConf hconf, + FileSinkOperator fsOp, Task<? extends Serializable> currTask) { + if (fsOp.getConf().isLinkedFileSink()) { + // If the user has HIVEMERGEMAPREDFILES set to false, the idea was the + // number of reducers are few, so the number of files anyway are small. + // However, with this optimization, we are increasing the number of files + // possibly by a big margin. So, merge aggresively. + return (hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) || + hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES)); + } + // There are separate configuration parameters to control whether to + // merge for a map-only job + // or for a map-reduce job + if (currTask.getWork() instanceof MapredWork) { + ReduceWork reduceWork = ((MapredWork) currTask.getWork()).getReduceWork(); + boolean mergeMapOnly = + hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && reduceWork == null; + boolean mergeMapRed = + hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES) && + reduceWork != null; + if (mergeMapOnly || mergeMapRed) { + return true; } } return false; @@ -1798,36 +1824,38 @@ public final class GenMapRedUtils { Path dest = null; + FileSinkDesc fileSinkDesc = fsOp.getConf(); + boolean isMmTable = fileSinkDesc.isMmTable(); if (chDir) { - FileSinkDesc fileSinkDesc = fsOp.getConf(); - dest = fileSinkDesc.getFinalDirName(); - - // generate the temporary file - // it must be on the same file system as the current destination - Context baseCtx = parseCtx.getContext(); - - // Create the required temporary file in the HDFS location if the destination - // path of the FileSinkOperator table is a blobstore path. - // TODO# special case #N - linked FDs (unions?) - Path tmpDir = baseCtx.getTempDirForPath(fileSinkDesc.getDestPath()); - - // Change all the linked file sink descriptors - if (fileSinkDesc.isLinkedFileSink()) { - for (FileSinkDesc fsConf:fileSinkDesc.getLinkedFileSinkDesc()) { - fsConf.setParentDir(tmpDir); - fsConf.setDirName(new Path(tmpDir, fsConf.getDirName().getName())); - Utilities.LOG14535.info("createMoveTask setting tmpDir for LinkedFileSink chDir " + fsConf.getDirName() + "; new parent " + tmpDir + ", dest was " + fileSinkDesc.getDestPath()); + + dest = fileSinkDesc.getMergeInputDirName(); + if (!isMmTable) { + // generate the temporary file + // it must be on the same file system as the current destination + Context baseCtx = parseCtx.getContext(); + + // Create the required temporary file in the HDFS location if the destination + // path of the FileSinkOperator table is a blobstore path. + Path tmpDir = baseCtx.getTempDirForPath(fileSinkDesc.getDestPath()); + + // Change all the linked file sink descriptors + if (fileSinkDesc.isLinkedFileSink()) { + for (FileSinkDesc fsConf:fileSinkDesc.getLinkedFileSinkDesc()) { + fsConf.setParentDir(tmpDir); + fsConf.setDirName(new Path(tmpDir, fsConf.getDirName().getName())); + Utilities.LOG14535.info("createMoveTask setting tmpDir for LinkedFileSink chDir " + fsConf.getDirName() + "; new parent " + tmpDir + ", dest was " + fileSinkDesc.getDestPath()); + } + } else { + fileSinkDesc.setDirName(tmpDir); + Utilities.LOG14535.info("createMoveTask setting tmpDir chDir " + tmpDir + "; dest was " + fileSinkDesc.getDestPath()); } - } else { - fileSinkDesc.setDirName(tmpDir); - Utilities.LOG14535.info("createMoveTask setting tmpDir for LinkedFileSink chDir " + tmpDir + "; dest was " + fileSinkDesc.getDestPath()); } } Task<MoveWork> mvTask = null; if (!chDir) { - mvTask = GenMapRedUtils.findMoveTask(mvTasks, fsOp); + mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(mvTasks, fsOp.getConf().getFinalDirName()); } // Set the move task to be dependent on the current task http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index 5348500..03c2e79 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -1586,6 +1586,10 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { try { tblObj = getTable(tableName); + // TODO: we should probably block all ACID tables here. + if (MetaStoreUtils.isMmTable(tblObj.getParameters())) { + throw new SemanticException("Merge is not supported for MM tables"); + } List<String> bucketCols = null; Class<? extends InputFormat> inputFormatClass = null; @@ -1676,9 +1680,8 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc, partSpec == null ? new HashMap<String, String>() : partSpec); ltd.setLbCtx(lbCtx); - // TODO# movetask is created here; handle MM tables - Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), - conf); + // No need to handle MM tables - unsupported path. + Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf); mergeTask.addDependentTask(moveTsk); if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) { http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index 73cc95a..5c67fe2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -304,7 +304,6 @@ public class GenTezUtils { linked = context.linkedFileSinks.get(path); linked.add(desc); - // TODO# special case #N - unions (tez) desc.setDirName(new Path(path, "" + linked.size())); Utilities.LOG14535.info("removing union - new desc with " + desc.getDirName() + "; parent " + path); desc.setLinkedFileSink(true); @@ -374,8 +373,7 @@ public class GenTezUtils { // If underlying data is RCFile or OrcFile, RCFileBlockMerge task or // OrcFileStripeMerge task would be created. LOG.info("using CombineHiveInputformat for the merge job"); - Utilities.LOG14535.info("merging files from " + fileSink.getConf().getDirName() + " to " + finalName); - // TODO# special case #N - merge + Utilities.LOG14535.info("will generate MR work for merging files from " + fileSink.getConf().getDirName() + " to " + finalName); GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName, context.dependencyTask, context.moveTask, hconf, context.currentTask); http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index ede1bda..66e2d27 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6575,7 +6575,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } else { queryTmpdir = ctx.getTempDirForPath(dest_path); } - Utilities.LOG14535.info("createFS for table specifying " + queryTmpdir + " from " + dest_path); + Utilities.LOG14535.info("create filesink w/DEST_TABLE specifying " + queryTmpdir + " from " + dest_path); if (dpCtx != null) { // set the root of the temporary path where dynamic partition columns will populate dpCtx.setRootPath(queryTmpdir); @@ -6644,7 +6644,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { isMmTable = MetaStoreUtils.isMmTable(dest_tab.getParameters()); queryTmpdir = isMmTable ? dest_path : ctx.getTempDirForPath(dest_path); - Utilities.LOG14535.info("createFS for partition specifying " + queryTmpdir + " from " + dest_path); + Utilities.LOG14535.info("create filesink w/DEST_PARTITION specifying " + queryTmpdir + " from " + dest_path); table_desc = Utilities.getTableDesc(dest_tab); // Add sorting/bucketing if needed http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java index ffc9c3e..4635f18 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java @@ -76,14 +76,6 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, } /** - * @param dir - * the dir to set - */ - public void setDir(String dir) { - this.dir = dir; - } - - /** * @return the listTasks */ public List<Task<? extends Serializable>> getListTasks() { @@ -121,8 +113,7 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, } } - public List<Task<? extends Serializable>> getTasks(HiveConf conf, - Object objCtx) { + public List<Task<? extends Serializable>> getTasks(HiveConf conf, Object objCtx) { ConditionalResolverMergeFilesCtx ctx = (ConditionalResolverMergeFilesCtx) objCtx; String dirName = ctx.getDir(); @@ -179,6 +170,8 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, if(lbLevel == 0) { // static partition without list bucketing long totalSz = getMergeSize(inpFs, dirPath, avgConditionSize); + Utilities.LOG14535.info("merge resolve simple case - totalSz " + totalSz + " from " + dirPath); + if (totalSz >= 0) { // add the merge job setupMapRedWork(conf, work, trgtSize, totalSz); resTsks.add(mrTask); @@ -192,6 +185,7 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, } } } else { + Utilities.LOG14535.info("Resolver returning movetask for " + dirPath); resTsks.add(mvTask); } } catch (IOException e) { @@ -234,6 +228,7 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, Task<? extends Serializable> mrTask, Task<? extends Serializable> mrAndMvTask, Path dirPath, FileSystem inpFs, ConditionalResolverMergeFilesCtx ctx, MapWork work, int dpLbLevel) throws IOException { + Utilities.LOG14535.info("generateActualTasks for " + dirPath); DynamicPartitionCtx dpCtx = ctx.getDPCtx(); // get list of dynamic partitions FileStatus[] status = HiveStatsUtils.getFileStatusRecurse(dirPath, dpLbLevel, inpFs); @@ -281,6 +276,7 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, // add the move task for those partitions that do not need merging if (toMove.size() > 0) { + // Note: this path should be specific to concatenate; never executed in a select query. // modify the existing move task as it is already in the candidate running tasks // running the MoveTask and MR task in parallel may @@ -362,6 +358,7 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, long totalSz = 0; int numFiles = 0; for (FileStatus fStat : fStats) { + Utilities.LOG14535.info("Resolver looking at " + fStat.getPath()); if (fStat.isDir()) { AverageSize avgSzDir = getAverageSize(inpFs, fStat.getPath()); if (avgSzDir.getTotalSize() < 0) { http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java index 7ec1bdd..615c63d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java @@ -28,6 +28,7 @@ public class FileMergeDesc extends AbstractOperatorDesc { private int listBucketingDepth; private boolean hasDynamicPartitions; private boolean isListBucketingAlterTableConcatenate; + private Long mmWriteId; public FileMergeDesc(DynamicPartitionCtx dynPartCtx, Path outputDir) { this.dpCtx = dynPartCtx; @@ -73,4 +74,12 @@ public class FileMergeDesc extends AbstractOperatorDesc { public void setListBucketingAlterTableConcatenate(boolean isListBucketingAlterTableConcatenate) { this.isListBucketingAlterTableConcatenate = isListBucketingAlterTableConcatenate; } + + public Long getMmWriteId() { + return mmWriteId; + } + + public void setMmWriteId(Long mmWriteId) { + this.mmWriteId = mmWriteId; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index def1c5f..8bef7a9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidWriteIds; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -182,6 +183,13 @@ public class FileSinkDesc extends AbstractOperatorDesc { return linkedFileSink ? parentDir : dirName; } + /** getFinalDirName that takes into account MM, but not DP, LB or buckets. */ + public Path getMergeInputDirName() { + Path root = getFinalDirName(); + if (mmWriteId == null) return root; + return new Path(root, ValidWriteIds.getMmFilePrefix(mmWriteId)); + } + @Explain(displayName = "table", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public TableDesc getTableInfo() { return tableInfo; @@ -255,7 +263,7 @@ public class FileSinkDesc extends AbstractOperatorDesc { return mmWriteId != null; } - public long getMmWriteId() { + public Long getMmWriteId() { return mmWriteId; } @@ -485,6 +493,10 @@ public class FileSinkDesc extends AbstractOperatorDesc { this.statsTmpDir = statsCollectionTempDir; } + public void setMmWriteId(Long mmWriteId) { + this.mmWriteId = mmWriteId; + } + public class FileSinkOperatorExplainVectorization extends OperatorExplainVectorization { public FileSinkOperatorExplainVectorization(VectorDesc vectorDesc) { http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java index 9f498c7..f0b2775 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java @@ -23,6 +23,7 @@ import java.util.HashSet; import java.util.List; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -59,7 +60,7 @@ public class MoveWork implements Serializable { public MoveWork() { } - public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs) { + private MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs) { this.inputs = inputs; this.outputs = outputs; } @@ -68,6 +69,8 @@ public class MoveWork implements Serializable { final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, boolean checkFileFormat, boolean srcLocal) { this(inputs, outputs); + Utilities.LOG14535.info("Creating MoveWork " + System.identityHashCode(this) + + " with " + loadTableWork + "; " + loadFileWork); this.loadTableWork = loadTableWork; this.loadFileWork = loadFileWork; this.checkFileFormat = checkFileFormat; @@ -77,10 +80,7 @@ public class MoveWork implements Serializable { public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs, final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, boolean checkFileFormat) { - this(inputs, outputs); - this.loadTableWork = loadTableWork; - this.loadFileWork = loadFileWork; - this.checkFileFormat = checkFileFormat; + this(inputs, outputs, loadTableWork, loadFileWork, checkFileFormat, false); } @Explain(displayName = "tables", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/test/queries/clientpositive/mm_all.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/mm_all.q b/ql/src/test/queries/clientpositive/mm_all.q index 1f85c48..8ce42a2 100644 --- a/ql/src/test/queries/clientpositive/mm_all.q +++ b/ql/src/test/queries/clientpositive/mm_all.q @@ -131,6 +131,42 @@ drop table skew_dp_union_mm; +set hive.merge.orcfile.stripe.level=true; +set hive.merge.tezfiles=true; +set hive.merge.mapfiles=true; +set hive.merge.mapredfiles=true; + + +create table merge0_mm (id int) stored as orc tblproperties('hivecommit'='true'); + +insert into table merge0_mm select key from intermediate; +select * from merge0_mm; + +set tez.grouping.split-count=1; +insert into table merge0_mm select key from intermediate; +set tez.grouping.split-count=0; +select * from merge0_mm; + +drop table merge0_mm; + + +create table merge1_mm (id int) partitioned by (key int) stored as orc tblproperties('hivecommit'='true'); + +insert into table merge1_mm partition (key) select key, key from intermediate; +select * from merge1_mm; + +set tez.grouping.split-count=1; +insert into table merge1_mm partition (key) select key, key from intermediate; +set tez.grouping.split-count=0; +select * from merge1_mm; + +drop table merge1_mm; + + +-- TODO: need to include merge+union, but it's broken for now + + + @@ -140,31 +176,14 @@ drop table skew_dp_union_mm; ---drop table merge_mm; + + --drop table ctas_mm; -- -- --create table ctas_mm tblproperties ('hivecommit'='true') as select * from src limit 3; -- -- ---set hive.merge.mapredfiles=true; ---set hive.merge.sparkfiles=true; ---set hive.merge.tezfiles=true; --- ---CREATE TABLE merge_mm (key INT, value STRING) --- PARTITIONED BY (ds STRING, part STRING) STORED AS ORC tblproperties ('hivecommit'='true'); --- ---EXPLAIN ---INSERT OVERWRITE TABLE merge_mm PARTITION (ds='123', part) --- SELECT key, value, PMOD(HASH(key), 2) as part --- FROM src; --- ---INSERT OVERWRITE TABLE merge_mm PARTITION (ds='123', part) --- SELECT key, value, PMOD(HASH(key), 2) as part --- FROM src; --- --- --- ---- TODO load, multi-insert etc -- -- http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/test/queries/clientpositive/mm_current.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/mm_current.q b/ql/src/test/queries/clientpositive/mm_current.q index ceb7a1a..f423b00 100644 --- a/ql/src/test/queries/clientpositive/mm_current.q +++ b/ql/src/test/queries/clientpositive/mm_current.q @@ -8,45 +8,17 @@ set hive.tez.auto.reducer.parallelism=false; drop table intermediate; create table intermediate(key int) partitioned by (p int) stored as orc; -insert into table intermediate partition(p='455') select key from src limit 2; -insert into table intermediate partition(p='456') select key from src limit 2; +insert into table intermediate partition(p='455') select distinct key from src where key >= 0 order by key desc limit 2; +insert into table intermediate partition(p='456') select distinct key from src where key is not null order by key asc limit 2; -set hive.optimize.skewjoin.compiletime = true; +set hive.merge.orcfile.stripe.level=true; +set hive.merge.tezfiles=true; +set hive.merge.mapfiles=true; +set hive.merge.mapredfiles=true; -create table skew_mm(k1 int, k2 int, k4 int) skewed by (k1, k4) on ((0,0),(1,1),(2,2),(3,3)) - stored as directories tblproperties ('hivecommit'='false'); -insert into table skew_mm -select key, key, key from intermediate; -drop table skew_mm; - - -create table skew_mm(k1 int, k2 int, k4 int) skewed by (k1, k4) on ((0,0),(1,1),(2,2),(3,3)) - stored as directories tblproperties ('hivecommit'='true'); - -insert into table skew_mm -select key, key, key from intermediate; - -select * from skew_mm; -drop table skew_mm; - - - - - -create table skew_mm(k1 int, k2 int, k4 int) partitioned by (k3 int) -skewed by (k1, k4) on ((0,0),(1,1),(2,2),(3,3)) stored as directories tblproperties ('hivecommit'='true'); - -insert into table skew_mm partition (k3) -select key as i, key as j, key as k, key as l from intermediate -union all -select key +1 as i, key +2 as j, key +3 as k, key +4 as l from intermediate; - - -select * from skew_mm; -drop table skew_mm;