HIVE-14640 : handle hive.merge.*files in select queries (Sergey Shelukhin)

Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/eacf9f9b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/eacf9f9b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/eacf9f9b

Branch: refs/heads/hive-14535
Commit: eacf9f9b6d7405b68def88ffc5fd755222375efc
Parents: bd78d66
Author: Sergey Shelukhin <ser...@apache.org>
Authored: Thu Oct 13 17:18:46 2016 -0700
Committer: Sergey Shelukhin <ser...@apache.org>
Committed: Thu Oct 13 17:18:46 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   2 +-
 .../hive/ql/exec/AbstractFileMergeOperator.java | 181 +++++++++------
 .../hadoop/hive/ql/exec/FileSinkOperator.java   | 187 ++-------------
 .../apache/hadoop/hive/ql/exec/MoveTask.java    |   8 +-
 .../hive/ql/exec/OrcFileMergeOperator.java      |  11 +-
 .../hive/ql/exec/RCFileMergeOperator.java       |   3 +-
 .../apache/hadoop/hive/ql/exec/Utilities.java   | 225 ++++++++++++++++--
 .../rcfile/truncate/ColumnTruncateMapper.java   |   1 +
 .../apache/hadoop/hive/ql/metadata/Hive.java    |   1 +
 .../hive/ql/optimizer/GenMapRedUtils.java       | 214 +++++++++--------
 .../hive/ql/parse/DDLSemanticAnalyzer.java      |   9 +-
 .../hadoop/hive/ql/parse/GenTezUtils.java       |   4 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |   4 +-
 .../ql/plan/ConditionalResolverMergeFiles.java  |  17 +-
 .../hadoop/hive/ql/plan/FileMergeDesc.java      |   9 +
 .../hadoop/hive/ql/plan/FileSinkDesc.java       |  14 +-
 .../apache/hadoop/hive/ql/plan/MoveWork.java    |  10 +-
 ql/src/test/queries/clientpositive/mm_all.q     |  57 +++--
 ql/src/test/queries/clientpositive/mm_current.q |  40 +---
 .../results/clientpositive/llap/mm_all.q.out    | 232 +++++++++++++++----
 .../clientpositive/llap/mm_current.q.out        | 165 +------------
 21 files changed, 758 insertions(+), 636 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index c89142c..6201c04 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3122,7 +3122,7 @@ public class HiveConf extends Configuration {
 
     HIVE_METASTORE_MM_HEARTBEAT_TIMEOUT("hive.metastore.mm.heartbeat.timeout", 
"1800s",
         new TimeValidator(TimeUnit.SECONDS),
-        "MM write ID times out after this long if a heartbeat is not send. 
Currently disabled."), // TODO# heartbeating not implemented
+        "MM write ID times out after this long if a heartbeat is not send. 
Currently disabled."),
 
     HIVE_METASTORE_MM_ABSOLUTE_TIMEOUT("hive.metastore.mm.absolute.timeout", 
"7d",
         new TimeValidator(TimeUnit.SECONDS),

http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
index 40c784b..dedbb78 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
@@ -34,6 +34,8 @@ import org.apache.hadoop.mapred.JobConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
 /**
  * Fast file merge operator for ORC and RCfile. This is an abstract class which
  * does not process any rows. Refer {@link 
org.apache.hadoop.hive.ql.exec.OrcFileMergeOperator}
@@ -47,20 +49,21 @@ public abstract class AbstractFileMergeOperator<T extends 
FileMergeDesc>
 
   protected JobConf jc;
   protected FileSystem fs;
-  protected boolean autoDelete;
-  protected boolean exception;
-  protected Path outPath;
-  protected Path finalPath;
-  protected Path dpPath;
-  protected Path tmpPath;
-  protected Path taskTmpPath;
-  protected int listBucketingDepth;
-  protected boolean hasDynamicPartitions;
-  protected boolean isListBucketingAlterTableConcatenate;
-  protected boolean tmpPathFixedConcatenate;
-  protected boolean tmpPathFixed;
-  protected Set<Path> incompatFileSet;
-  protected transient DynamicPartitionCtx dpCtx;
+  private boolean autoDelete;
+  private Path outPath; // The output path used by the subclasses.
+  private Path finalPath; // Used as a final destination; same as outPath for 
MM tables.
+  private Path dpPath;
+  private Path tmpPath; // Only stored to update based on the original in 
fixTmpPath.
+  private Path taskTmpPath; // Only stored to update based on the original in 
fixTmpPath.
+  private int listBucketingDepth;
+  private boolean hasDynamicPartitions;
+  private boolean isListBucketingAlterTableConcatenate;
+  private boolean tmpPathFixedConcatenate;
+  private boolean tmpPathFixed;
+  private Set<Path> incompatFileSet;
+  private transient DynamicPartitionCtx dpCtx;
+  private boolean isMmTable;
+  private String taskId;
 
   /** Kryo ctor. */
   protected AbstractFileMergeOperator() {
@@ -77,39 +80,50 @@ public abstract class AbstractFileMergeOperator<T extends 
FileMergeDesc>
     this.jc = new JobConf(hconf);
     incompatFileSet = new HashSet<Path>();
     autoDelete = false;
-    exception = false;
     tmpPathFixed = false;
     tmpPathFixedConcatenate = false;
-    outPath = null;
-    finalPath = null;
     dpPath = null;
-    tmpPath = null;
-    taskTmpPath = null;
     dpCtx = conf.getDpCtx();
     hasDynamicPartitions = conf.hasDynamicPartitions();
     isListBucketingAlterTableConcatenate = conf
         .isListBucketingAlterTableConcatenate();
     listBucketingDepth = conf.getListBucketingDepth();
     Path specPath = conf.getOutputPath();
-    updatePaths(Utilities.toTempPath(specPath),
-        Utilities.toTaskTempPath(specPath));
+    isMmTable = conf.getMmWriteId() != null;
+    if (isMmTable) {
+      updatePaths(specPath, null);
+    } else {
+      updatePaths(Utilities.toTempPath(specPath), 
Utilities.toTaskTempPath(specPath));
+    }
     try {
       fs = specPath.getFileSystem(hconf);
-      autoDelete = fs.deleteOnExit(outPath);
+      if (!isMmTable) {
+        // Do not delete for MM tables. We either want the file if we succeed, 
or we must
+        // delete is explicitly before proceeding if the merge fails.
+        autoDelete = fs.deleteOnExit(outPath);
+      }
     } catch (IOException e) {
-      this.exception = true;
-      throw new HiveException("Failed to initialize AbstractFileMergeOperator",
-          e);
+      throw new HiveException("Failed to initialize 
AbstractFileMergeOperator", e);
     }
   }
 
   // sets up temp and task temp path
   private void updatePaths(Path tp, Path ttp) {
-    String taskId = Utilities.getTaskId(jc);
+    if (taskId == null) {
+      taskId = Utilities.getTaskId(jc);
+    }
     tmpPath = tp;
-    taskTmpPath = ttp;
-    finalPath = new Path(tp, taskId);
-    outPath = new Path(ttp, Utilities.toTempPath(taskId));
+    if (isMmTable) {
+      taskTmpPath = null;
+      // Make sure we don't collide with the source.
+      outPath = finalPath = new Path(tmpPath, taskId + ".merged");
+    } else {
+      taskTmpPath = ttp;
+      finalPath = new Path(tp, taskId);
+      outPath = new Path(ttp, Utilities.toTempPath(taskId));
+    }
+    Utilities.LOG14535.info("Paths for merge " + taskId + ": tmp " + tmpPath + 
", task "
+        + taskTmpPath + ", final " + finalPath + ", out " + outPath, new 
Exception());
   }
 
   /**
@@ -142,7 +156,7 @@ public abstract class AbstractFileMergeOperator<T extends 
FileMergeDesc>
   protected void fixTmpPath(Path inputPath, int depthDiff) throws IOException {
 
     // don't need to update tmp paths when there is no depth difference in 
paths
-    if (depthDiff <=0) {
+    if (depthDiff <= 0) {
       return;
     }
 
@@ -157,10 +171,12 @@ public abstract class AbstractFileMergeOperator<T extends 
FileMergeDesc>
     }
 
     Path newTmpPath = new Path(tmpPath, newPath);
-    Path newTaskTmpPath = new Path(taskTmpPath, newPath);
     if (!fs.exists(newTmpPath)) {
+      Utilities.LOG14535.info("Creating " + newTmpPath);
       fs.mkdirs(newTmpPath);
     }
+
+    Path newTaskTmpPath = (taskTmpPath != null) ? new Path(taskTmpPath, 
newPath) : null;
     updatePaths(newTmpPath, newTaskTmpPath);
   }
 
@@ -182,7 +198,7 @@ public abstract class AbstractFileMergeOperator<T extends 
FileMergeDesc>
   }
 
   protected void fixTmpPath(Path path) throws IOException {
-
+    Utilities.LOG14535.info("Calling fixTmpPath with " + path);
     // Fix temp path for alter table ... concatenate
     if (isListBucketingAlterTableConcatenate) {
       if (this.tmpPathFixedConcatenate) {
@@ -208,38 +224,49 @@ public abstract class AbstractFileMergeOperator<T extends 
FileMergeDesc>
   @Override
   public void closeOp(boolean abort) throws HiveException {
     try {
-      if (!abort) {
-        // if outPath does not exist, then it means all paths within combine 
split are skipped as
-        // they are incompatible for merge (for example: files without stripe 
stats).
-        // Those files will be added to incompatFileSet
-        if (fs.exists(outPath)) {
-          FileStatus fss = fs.getFileStatus(outPath);
+      if (abort) {
+        if (!autoDelete || isMmTable) {
+          fs.delete(outPath, true);
+        }
+        return;
+      }
+      // if outPath does not exist, then it means all paths within combine 
split are skipped as
+      // they are incompatible for merge (for example: files without stripe 
stats).
+      // Those files will be added to incompatFileSet
+      if (fs.exists(outPath)) {
+        FileStatus fss = fs.getFileStatus(outPath);
+        if (!isMmTable) {
           if (!fs.rename(outPath, finalPath)) {
-            throw new IOException(
-                "Unable to rename " + outPath + " to " + finalPath);
+            throw new IOException("Unable to rename " + outPath + " to " + 
finalPath);
           }
-          LOG.info("renamed path " + outPath + " to " + finalPath + " . File" +
-              " size is "
-              + fss.getLen());
+          LOG.info("Renamed path " + outPath + " to " + finalPath
+              + "(" + fss.getLen() + " bytes).");
+        } else {
+          assert finalPath.equals(outPath);
+          // There's always just one file that we have merged.
+          // The union/DP/etc. should already be account for in the path.
+          Utilities.writeMmCommitManifest(Lists.newArrayList(outPath),
+              tmpPath.getParent(), fs, taskId, conf.getMmWriteId(), null);
+          LOG.info("Merged into " + finalPath + "(" + fss.getLen() + " 
bytes).");
         }
+      }
 
-        // move any incompatible files to final path
-        if (incompatFileSet != null && !incompatFileSet.isEmpty()) {
-          for (Path incompatFile : incompatFileSet) {
-            Path destDir = finalPath.getParent();
-            try {
-              Utilities.renameOrMoveFiles(fs, incompatFile, destDir);
-              LOG.info("Moved incompatible file " + incompatFile + " to " +
-                  destDir);
-            } catch (HiveException e) {
-              LOG.error("Unable to move " + incompatFile + " to " + destDir);
-              throw new IOException(e);
-            }
-          }
+      // move any incompatible files to final path
+      if (incompatFileSet != null && !incompatFileSet.isEmpty()) {
+        if (isMmTable) {
+          // We only support query-time merge for MM tables, so don't handle 
this.
+          throw new HiveException("Incompatible files should not happen in MM 
tables.");
         }
-      } else {
-        if (!autoDelete) {
-          fs.delete(outPath, true);
+        for (Path incompatFile : incompatFileSet) {
+          Path destDir = finalPath.getParent();
+          try {
+            Utilities.renameOrMoveFiles(fs, incompatFile, destDir);
+            LOG.info("Moved incompatible file " + incompatFile + " to " +
+                destDir);
+          } catch (HiveException e) {
+            LOG.error("Unable to move " + incompatFile + " to " + destDir);
+            throw new IOException(e);
+          }
         }
       }
     } catch (IOException e) {
@@ -253,16 +280,26 @@ public abstract class AbstractFileMergeOperator<T extends 
FileMergeDesc>
     try {
       Path outputDir = conf.getOutputPath();
       FileSystem fs = outputDir.getFileSystem(hconf);
-      Path backupPath = backupOutputPath(fs, outputDir);
-      // TODO# merge-related move
-      Utilities.mvFileToFinalPath(outputDir, hconf, success, LOG, 
conf.getDpCtx(),
-              null, reporter);
-      if (success) {
-        LOG.info("jobCloseOp moved merged files to output dir: " + outputDir);
-      }
-      if (backupPath != null) {
-        fs.delete(backupPath, true);
+      Long mmWriteId = conf.getMmWriteId();
+      if (mmWriteId == null) {
+        Path backupPath = backupOutputPath(fs, outputDir);
+        Utilities.mvFileToFinalPath(
+            outputDir, hconf, success, LOG, conf.getDpCtx(), null, reporter);
+        if (success) {
+          LOG.info("jobCloseOp moved merged files to output dir: " + 
outputDir);
+        }
+        if (backupPath != null) {
+          fs.delete(backupPath, true);
+        }
+      } else {
+        int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(),
+            lbLevels = conf.getListBucketingDepth();
+        // We don't expect missing buckets from mere (actually there should be 
no buckets),
+        // so just pass null as bucketing context. Union suffix should also be 
accounted for.
+        Utilities.handleMmTableFinalPath(outputDir.getParent(), null, hconf, 
success,
+            dpLevels, lbLevels, null, mmWriteId, reporter);
       }
+
     } catch (IOException e) {
       throw new HiveException("Failed jobCloseOp for 
AbstractFileMergeOperator",
           e);
@@ -290,4 +327,12 @@ public abstract class AbstractFileMergeOperator<T extends 
FileMergeDesc>
   public static String getOperatorName() {
     return "MERGE";
   }
+
+  protected final Path getOutPath() {
+    return outPath;
+  }
+
+  protected final void addIncompatibleFile(Path path) {
+    incompatFileSet.add(path);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 5902036..dda4b51 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Utilities.MissingBucketsContext;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveKey;
@@ -98,7 +99,6 @@ import com.google.common.collect.Lists;
 public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     Serializable {
 
-  private static final String MANIFEST_EXTENSION = ".manifest";
   public static final Logger LOG = 
LoggerFactory.getLogger(FileSinkOperator.class);
   private static final boolean isInfoEnabled = LOG.isInfoEnabled();
   private static final boolean isDebugEnabled = LOG.isDebugEnabled();
@@ -1128,26 +1128,9 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
           fsp.commit(fs, commitPaths);
         }
       }
-      if (!commitPaths.isEmpty()) {
-        Path manifestPath = getManifestDir(specPath, 
childSpecPathDynLinkedPartitions);
-        manifestPath = new Path(manifestPath, "_tmp." + 
ValidWriteIds.getMmFilePrefix(
-            conf.getMmWriteId()) + "_" + taskId + MANIFEST_EXTENSION);
-        Utilities.LOG14535.info("Writing manifest to " + manifestPath + " with 
" + commitPaths);
-        try {
-          // Don't overwrite the manifest... should fail if we have collisions.
-          // We assume one FSOP per task (per specPath), so we create it in 
specPath.
-          try (FSDataOutputStream out = fs.create(manifestPath, false)) {
-            if (out == null) {
-              throw new HiveException("Failed to create manifest at " + 
manifestPath);
-            }
-            out.writeInt(commitPaths.size());
-            for (Path path : commitPaths) {
-              out.writeUTF(path.toString());
-            }
-          }
-        } catch (IOException e) {
-          throw new HiveException(e);
-        }
+      if (conf.getMmWriteId() != null) {
+        Utilities.writeMmCommitManifest(commitPaths, specPath, fs, taskId, 
conf.getMmWriteId(),
+            childSpecPathDynLinkedPartitions);
       }
       // Only publish stats if this operator's flag was set to gather stats
       if (conf.isGatherStats()) {
@@ -1165,9 +1148,6 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
     super.closeOp(abort);
   }
 
-  private static Path getManifestDir(Path specPath, String unionSuffix) {
-    return (unionSuffix == null) ? specPath : new Path(specPath, unionSuffix);
-  }
 
   /**
    * @return the name of the operator
@@ -1196,9 +1176,17 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
           unionSuffix = conf.getDirName().getName();
         }
         if (!conf.isMmTable()) {
-          Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, 
conf, reporter); // TODO# other callers
+          Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, 
conf, reporter);
         } else {
-          handleMmTable(specPath, unionSuffix, hconf, success, dpCtx, lbCtx, 
conf, reporter);
+          int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(),
+              lbLevels = lbCtx.calculateListBucketingLevel();
+          // TODO: why is it stored in both?
+          int numBuckets = (conf.getTable() != null) ? 
conf.getTable().getNumBuckets()
+              : (dpCtx != null ? dpCtx.getNumBuckets() : 0);
+          MissingBucketsContext mbc = new MissingBucketsContext(
+              conf.getTableInfo(), numBuckets, conf.getCompressed());
+          Utilities.handleMmTableFinalPath(specPath, unionSuffix, hconf, 
success,
+              dpLevels, lbLevels, mbc, conf.getMmWriteId(), reporter);
         }
       }
     } catch (IOException e) {
@@ -1207,152 +1195,6 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
     super.jobCloseOp(hconf, success);
   }
 
-  private static FileStatus[] getMmDirectoryCandidates(FileSystem fs, Path 
path,
-      DynamicPartitionCtx dpCtx, ListBucketingCtx lbCtx, String unionSuffix, 
PathFilter filter)
-          throws IOException {
-    StringBuilder sb = new StringBuilder(path.toUri().getPath());
-    int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(),
-        lbLevels = lbCtx == null ? 0 : lbCtx.getSkewedColNames().size();
-    for (int i = 0; i < dpLevels + lbLevels; i++) {
-      sb.append(Path.SEPARATOR).append("*");
-    }
-    if (unionSuffix != null) {
-      sb.append(Path.SEPARATOR).append(unionSuffix);
-    }
-    sb.append(Path.SEPARATOR).append("*"); // TODO: we could add exact mm 
prefix here
-    Utilities.LOG14535.info("Looking for files via: " + sb.toString());
-    Path pathPattern = new Path(path, sb.toString());
-    return fs.globStatus(pathPattern, filter);
-  }
-
-  private void handleMmTable(Path specPath, String unionSuffix, Configuration 
hconf,
-      boolean success, DynamicPartitionCtx dpCtx, ListBucketingCtx lbCtx, 
FileSinkDesc conf,
-      Reporter reporter) throws IOException, HiveException {
-    FileSystem fs = specPath.getFileSystem(hconf);
-    // Manifests would be at the root level, but the results at target level.
-    // TODO# special case - doesn't take bucketing into account
-    Path manifestDir = getManifestDir(specPath, unionSuffix);
-
-    ValidWriteIds.IdPathFilter filter = new 
ValidWriteIds.IdPathFilter(conf.getMmWriteId(), true);
-    if (!success) {
-      tryDeleteAllMmFiles(fs, specPath, manifestDir, dpCtx, lbCtx, 
unionSuffix, filter);
-      return;
-    }
-    FileStatus[] files = HiveStatsUtils.getFileStatusRecurse(manifestDir, 1, 
fs, filter);
-    Utilities.LOG14535.info("Looking for manifests in: " + manifestDir);
-    List<Path> manifests = new ArrayList<>();
-    if (files != null) {
-      for (FileStatus status : files) {
-        Path path = status.getPath();
-        if (path.getName().endsWith(MANIFEST_EXTENSION)) {
-          Utilities.LOG14535.info("Reading manifest " + path);
-          manifests.add(path);
-        }
-      }
-    }
-
-    Utilities.LOG14535.info("Looking for files in: " + specPath);
-    files = getMmDirectoryCandidates(fs, specPath, dpCtx, lbCtx, unionSuffix, 
filter);
-    ArrayList<FileStatus> results = new ArrayList<>();
-    if (files != null) {
-      for (FileStatus status : files) {
-        Path path = status.getPath();
-        Utilities.LOG14535.info("Looking at path: " + path + " from " + 
System.identityHashCode(this));
-        if (!status.isDirectory()) {
-          if (!path.getName().endsWith(MANIFEST_EXTENSION)) {
-            Utilities.LOG14535.warn("Unknown file found, deleting: " + path);
-            tryDelete(fs, path);
-          }
-        } else {
-          results.add(status);
-        }
-      }
-    }
-
-    HashSet<String> committed = new HashSet<>();
-    for (Path mfp : manifests) {
-      try (FSDataInputStream mdis = fs.open(mfp)) {
-        int fileCount = mdis.readInt();
-        for (int i = 0; i < fileCount; ++i) {
-          String nextFile = mdis.readUTF();
-          if (!committed.add(nextFile)) {
-            throw new HiveException(nextFile + " was specified in multiple 
manifests");
-          }
-        }
-      }
-    }
-
-    for (FileStatus status : results) {
-      for (FileStatus child : fs.listStatus(status.getPath())) {
-        Path childPath = child.getPath();
-        if (committed.remove(childPath.toString())) continue; // A good file.
-        Utilities.LOG14535.info("Deleting " + childPath + " that was not 
committed");
-        // We should actually succeed here - if we fail, don't commit the 
query.
-        if (!fs.delete(childPath, true)) {
-          throw new HiveException("Failed to delete an uncommitted path " + 
childPath);
-        }
-      }
-    }
-
-    if (!committed.isEmpty()) {
-      throw new HiveException("The following files were committed but not 
found: " + committed);
-    }
-    for (Path mfp : manifests) {
-      Utilities.LOG14535.info("Deleting manifest " + mfp);
-      tryDelete(fs, mfp);
-    }
-    // Delete the manifest directory if we only created it for manifests; 
otherwise the
-    // dynamic partition loader will find it and try to load it as a 
partition... what a mess.
-    if (manifestDir != specPath) {
-      FileStatus[] remainingFiles = fs.listStatus(manifestDir);
-      if (remainingFiles == null || remainingFiles.length == 0) {
-        Utilities.LOG14535.info("Deleting directory " + manifestDir);
-        tryDelete(fs, manifestDir);
-      }
-    }
-
-    if (results.isEmpty()) return;
-
-    // TODO: see HIVE-14886 - removeTempOrDuplicateFiles is broken for list 
bucketing,
-    //       so maintain parity here by not calling it at all.
-    if (lbCtx != null) return;
-    FileStatus[] finalResults = results.toArray(new 
FileStatus[results.size()]);
-    List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(
-        fs, finalResults, dpCtx, conf, hconf);
-    // create empty buckets if necessary
-    if (emptyBuckets.size() > 0) {
-      Utilities.createEmptyBuckets(hconf, emptyBuckets, conf, reporter);
-    }
-  }
-
-  private void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path 
manifestDir,
-      DynamicPartitionCtx dpCtx, ListBucketingCtx lbCtx, String unionSuffix,
-      ValidWriteIds.IdPathFilter filter) throws IOException {
-    FileStatus[] files = getMmDirectoryCandidates(fs, specPath, dpCtx, lbCtx, 
unionSuffix, filter);
-    if (files != null) {
-      for (FileStatus status : files) {
-        Utilities.LOG14535.info("Deleting " + status.getPath() + " on 
failure");
-        tryDelete(fs, status.getPath());
-      }
-    }
-    files = HiveStatsUtils.getFileStatusRecurse(manifestDir, 1, fs, filter);
-    if (files != null) {
-      for (FileStatus status : files) {
-        Utilities.LOG14535.info("Deleting " + status.getPath() + " on 
failure");
-        tryDelete(fs, status.getPath());
-      }
-    }
-  }
-
-
-  private void tryDelete(FileSystem fs, Path path) {
-    try {
-      fs.delete(path, true);
-    } catch (IOException ex) {
-      LOG.error("Failed to delete " + path, ex);
-    }
-  }
-
   @Override
   public OperatorType getType() {
     return OperatorType.FILESINK;
@@ -1427,7 +1269,6 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
     for (Map.Entry<String, FSPaths> entry : valToPaths.entrySet()) {
       String fspKey = entry.getKey();     // DP/LB
       FSPaths fspValue = entry.getValue();
-      // TODO# useful code as reference, as it takes apart the crazy paths
       // for bucketed tables, hive.optimize.sort.dynamic.partition optimization
       // adds the taskId to the fspKey.
       if (conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) {

http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 9bc4836..f2b8ca3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -256,6 +256,9 @@ public class MoveTask extends Task<MoveWork> implements 
Serializable {
 
   @Override
   public int execute(DriverContext driverContext) {
+    Utilities.LOG14535.info("Executing MoveWork " + 
System.identityHashCode(work)
+        + " with " + work.getLoadFileWork() + "; " + work.getLoadTableWork() + 
"; "
+        + work.getLoadMultiFilesWork(), new Exception());
 
     try {
       if (driverContext.getCtx().getExplainAnalyze() == AnalyzeState.RUNNING) {
@@ -315,15 +318,14 @@ public class MoveTask extends Task<MoveWork> implements 
Serializable {
 
         boolean isAcid = work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.NOT_ACID;
         if (tbd.isMmTable() && isAcid) {
-          // TODO# need to make sure ACID writes to final directories. 
Otherwise, might need to move.
-          throw new HiveException("ACID and MM are not supported");
+           throw new HiveException("ACID and MM are not supported");
         }
 
         // Create a data container
         DataContainer dc = null;
         if (tbd.getPartitionSpec().size() == 0) {
           dc = new DataContainer(table.getTTable());
-          Utilities.LOG14535.info("loadTable called from " + 
tbd.getSourcePath() + " into " + tbd.getTable());
+          Utilities.LOG14535.info("loadTable called from " + 
tbd.getSourcePath() + " into " + tbd.getTable().getTableName(), new 
Exception());
           db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), 
tbd.getReplace(),
               work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isAcid, 
hasFollowingStatsTask(),
               tbd.getMmWriteId());

http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
index e3cb765..835791b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
@@ -75,6 +75,7 @@ public class OrcFileMergeOperator extends
   private void processKeyValuePairs(Object key, Object value)
       throws HiveException {
     String filePath = "";
+    boolean exception = false;
     try {
       OrcFileValueWrapper v;
       OrcFileKeyWrapper k;
@@ -87,12 +88,15 @@ public class OrcFileMergeOperator extends
       // skip incompatible file, files that are missing stripe statistics are 
set to incompatible
       if (k.isIncompatFile()) {
         LOG.warn("Incompatible ORC file merge! Stripe statistics is missing. " 
+ k.getInputPath());
-        incompatFileSet.add(k.getInputPath());
+        addIncompatibleFile(k.getInputPath());
         return;
       }
 
       filePath = k.getInputPath().toUri().getPath();
 
+      Utilities.LOG14535.info("OrcFileMergeOperator processing " + filePath, 
new Exception());
+
+
       fixTmpPath(k.getInputPath().getParent());
 
       v = (OrcFileValueWrapper) value;
@@ -126,6 +130,7 @@ public class OrcFileMergeOperator extends
           options.bufferSize(compressBuffSize).enforceBufferSize();
         }
 
+        Path outPath = getOutPath();
         outWriter = OrcFile.createWriter(outPath, options);
         if (isLogDebugEnabled) {
           LOG.info("ORC merge file output path: " + outPath);
@@ -133,7 +138,7 @@ public class OrcFileMergeOperator extends
       }
 
       if (!checkCompatibility(k)) {
-        incompatFileSet.add(k.getInputPath());
+        addIncompatibleFile(k.getInputPath());
         return;
       }
 
@@ -164,7 +169,7 @@ public class OrcFileMergeOperator extends
         outWriter.appendUserMetadata(v.getUserMetadata());
       }
     } catch (Throwable e) {
-      this.exception = true;
+      exception = true;
       LOG.error("Closing operator..Exception: " + 
ExceptionUtils.getStackTrace(e));
       throw new HiveException(e);
     } finally {

http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java
index 4dea1d2..349b459 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java
@@ -77,7 +77,7 @@ public class RCFileMergeOperator
         codec = key.getCodec();
         columnNumber = key.getKeyBuffer().getColumnNumber();
         RCFileOutputFormat.setColumnNumber(jc, columnNumber);
-        outWriter = new RCFile.Writer(fs, jc, outPath, null, codec);
+        outWriter = new RCFile.Writer(fs, jc, getOutPath(), null, codec);
       }
 
       boolean sameCodec = ((codec == key.getCodec()) || 
codec.getClass().equals(
@@ -94,7 +94,6 @@ public class RCFileMergeOperator
           key.getRecordLength(), key.getKeyLength(),
           key.getCompressedKeyLength());
     } catch (Throwable e) {
-      this.exception = true;
       closeOp(true);
       throw new HiveException(e);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index d343e32..49bdd84 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -83,6 +83,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -94,6 +96,7 @@ import org.apache.hadoop.hive.common.HiveInterruptUtils;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.ValidWriteIds;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
@@ -146,6 +149,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
@@ -1411,7 +1415,7 @@ public final class Utilities {
       boolean success, Logger log, DynamicPartitionCtx dpCtx, FileSinkDesc 
conf,
       Reporter reporter) throws IOException,
       HiveException {
-
+    
     FileSystem fs = specPath.getFileSystem(hconf);
     Path tmpPath = Utilities.toTempPath(specPath);
     Path taskTmpPath = Utilities.toTaskTempPath(specPath);
@@ -1422,12 +1426,14 @@ public final class Utilities {
         PerfLogger perfLogger = SessionState.getPerfLogger();
         perfLogger.PerfLogBegin("FileSinkOperator", 
"RemoveTempOrDuplicateFiles");
         // remove any tmp file or double-committed output files
-        List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(fs, 
statuses, dpCtx, conf, hconf);
+        List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(
+            fs, statuses, dpCtx, conf, hconf);
         perfLogger.PerfLogEnd("FileSinkOperator", 
"RemoveTempOrDuplicateFiles");
         // create empty buckets if necessary
         if (emptyBuckets.size() > 0) {
           perfLogger.PerfLogBegin("FileSinkOperator", "CreateEmptyBuckets");
-          createEmptyBuckets(hconf, emptyBuckets, conf, reporter);
+          createEmptyBuckets(
+              hconf, emptyBuckets, conf.getCompressed(), conf.getTableInfo(), 
reporter);
           perfLogger.PerfLogEnd("FileSinkOperator", "CreateEmptyBuckets");
         }
         // move to the file destination
@@ -1457,7 +1463,7 @@ public final class Utilities {
    * @throws IOException
    */
   static void createEmptyBuckets(Configuration hconf, List<Path> paths,
-      FileSinkDesc conf, Reporter reporter)
+      boolean isCompressed, TableDesc tableInfo, Reporter reporter)
       throws HiveException, IOException {
 
     JobConf jc;
@@ -1469,13 +1475,11 @@ public final class Utilities {
     }
     HiveOutputFormat<?, ?> hiveOutputFormat = null;
     Class<? extends Writable> outputClass = null;
-    boolean isCompressed = conf.getCompressed();
-    TableDesc tableInfo = conf.getTableInfo();
     try {
       Serializer serializer = (Serializer) 
tableInfo.getDeserializerClass().newInstance();
       serializer.initialize(null, tableInfo.getProperties());
       outputClass = serializer.getSerializedClass();
-      hiveOutputFormat = HiveFileFormatUtils.getHiveOutputFormat(hconf, 
conf.getTableInfo());
+      hiveOutputFormat = HiveFileFormatUtils.getHiveOutputFormat(hconf, 
tableInfo);
     } catch (SerDeException e) {
       throw new HiveException(e);
     } catch (InstantiationException e) {
@@ -1518,13 +1522,21 @@ public final class Utilities {
    */
   public static List<Path> removeTempOrDuplicateFiles(FileSystem fs, 
FileStatus[] fileStats,
       DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf) 
throws IOException {
+    int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(),
+        numBuckets = (conf != null && conf.getTable() != null)
+          ? conf.getTable().getNumBuckets() : 0;
+    return removeTempOrDuplicateFiles(fs, fileStats, dpLevels, numBuckets, 
hconf);
+  }
+
+  public static List<Path> removeTempOrDuplicateFiles(FileSystem fs, 
FileStatus[] fileStats,
+      int dpLevels, int numBuckets, Configuration hconf) throws IOException {
     if (fileStats == null) {
       return null;
     }
 
     List<Path> result = new ArrayList<Path>();
     HashMap<String, FileStatus> taskIDToFile = null;
-    if (dpCtx != null) {
+    if (dpLevels > 0) {
       FileStatus parts[] = fileStats;
 
       for (int i = 0; i < parts.length; ++i) {
@@ -1543,14 +1555,14 @@ public final class Utilities {
 
         taskIDToFile = removeTempOrDuplicateFiles(items, fs);
         // if the table is bucketed and enforce bucketing, we should check and 
generate all buckets
-        if (dpCtx.getNumBuckets() > 0 && taskIDToFile != null && 
!"tez".equalsIgnoreCase(hconf.get(ConfVars.HIVE_EXECUTION_ENGINE.varname))) {
+        if (numBuckets > 0 && taskIDToFile != null && 
!"tez".equalsIgnoreCase(hconf.get(ConfVars.HIVE_EXECUTION_ENGINE.varname))) {
           // refresh the file list
           items = fs.listStatus(parts[i].getPath());
           // get the missing buckets and generate empty buckets
           String taskID1 = taskIDToFile.keySet().iterator().next();
           Path bucketPath = taskIDToFile.values().iterator().next().getPath();
           Utilities.LOG14535.info("Bucket path " + bucketPath);
-          for (int j = 0; j < dpCtx.getNumBuckets(); ++j) {
+          for (int j = 0; j < numBuckets; ++j) {
             addBucketFileIfMissing(result, taskIDToFile, taskID1, bucketPath, 
j);
           }
         }
@@ -1561,13 +1573,13 @@ public final class Utilities {
         return result;
       }
       taskIDToFile = removeTempOrDuplicateFiles(items, fs);
-      if(taskIDToFile != null && taskIDToFile.size() > 0 && conf != null && 
conf.getTable() != null
-          && (conf.getTable().getNumBuckets() > taskIDToFile.size()) && 
!"tez".equalsIgnoreCase(hconf.get(ConfVars.HIVE_EXECUTION_ENGINE.varname))) {
+      if(taskIDToFile != null && taskIDToFile.size() > 0 && (numBuckets > 
taskIDToFile.size())
+          && 
!"tez".equalsIgnoreCase(hconf.get(ConfVars.HIVE_EXECUTION_ENGINE.varname))) {
           // get the missing buckets and generate empty buckets for 
non-dynamic partition
         String taskID1 = taskIDToFile.keySet().iterator().next();
         Path bucketPath = taskIDToFile.values().iterator().next().getPath();
         Utilities.LOG14535.info("Bucket path " + bucketPath);
-        for (int j = 0; j < conf.getTable().getNumBuckets(); ++j) {
+        for (int j = 0; j < numBuckets; ++j) {
           addBucketFileIfMissing(result, taskIDToFile, taskID1, bucketPath, j);
         }
       }
@@ -3746,4 +3758,191 @@ public final class Utilities {
     String suffix = "KMGTPE".charAt(exp-1) + "";
     return String.format("%.2f%sB", bytes / Math.pow(unit, exp), suffix);
   }
+
+  private static final String MANIFEST_EXTENSION = ".manifest";
+
+  private static Path getManifestDir(Path specPath, String unionSuffix) {
+    return (unionSuffix == null) ? specPath : new Path(specPath, unionSuffix);
+  }
+
+  private static void tryDelete(FileSystem fs, Path path) {
+    try {
+      fs.delete(path, true);
+    } catch (IOException ex) {
+      LOG.error("Failed to delete " + path, ex);
+    }
+  }
+
+  private static FileStatus[] getMmDirectoryCandidates(FileSystem fs, Path 
path,
+      int dpLevels, int lbLevels, String unionSuffix, PathFilter filter) 
throws IOException {
+    StringBuilder sb = new StringBuilder(path.toUri().getPath());
+    for (int i = 0; i < dpLevels + lbLevels; i++) {
+      sb.append(Path.SEPARATOR).append("*");
+    }
+    if (unionSuffix != null) {
+      sb.append(Path.SEPARATOR).append(unionSuffix);
+    }
+    sb.append(Path.SEPARATOR).append("*"); // TODO: we could add exact mm 
prefix here
+    Utilities.LOG14535.info("Looking for files via: " + sb.toString());
+    Path pathPattern = new Path(path, sb.toString());
+    return fs.globStatus(pathPattern, filter);
+  }
+
+  private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path 
manifestDir,
+      int dpLevels, int lbLevels, String unionSuffix, 
ValidWriteIds.IdPathFilter filter)
+          throws IOException {
+    FileStatus[] files = getMmDirectoryCandidates(
+        fs, specPath, dpLevels, lbLevels, unionSuffix, filter);
+    if (files != null) {
+      for (FileStatus status : files) {
+        Utilities.LOG14535.info("Deleting " + status.getPath() + " on 
failure");
+        tryDelete(fs, status.getPath());
+      }
+    }
+    files = HiveStatsUtils.getFileStatusRecurse(manifestDir, 1, fs, filter);
+    if (files != null) {
+      for (FileStatus status : files) {
+        Utilities.LOG14535.info("Deleting " + status.getPath() + " on 
failure");
+        tryDelete(fs, status.getPath());
+      }
+    }
+  }
+
+
+  public static void writeMmCommitManifest(List<Path> commitPaths, Path 
specPath, FileSystem fs,
+      String taskId, Long mmWriteId, String unionSuffix) throws HiveException {
+    if (commitPaths.isEmpty()) return;
+    Path manifestPath = getManifestDir(specPath, unionSuffix);
+    manifestPath = new Path(manifestPath, "_tmp." + 
ValidWriteIds.getMmFilePrefix(
+        mmWriteId) + "_" + taskId + MANIFEST_EXTENSION);
+    Utilities.LOG14535.info("Writing manifest to " + manifestPath + " with " + 
commitPaths);
+    try {
+      // Don't overwrite the manifest... should fail if we have collisions.
+      // We assume one FSOP per task (per specPath), so we create it in 
specPath.
+      try (FSDataOutputStream out = fs.create(manifestPath, false)) {
+        if (out == null) {
+          throw new HiveException("Failed to create manifest at " + 
manifestPath);
+        }
+        out.writeInt(commitPaths.size());
+        for (Path path : commitPaths) {
+          out.writeUTF(path.toString());
+        }
+      }
+    } catch (IOException e) {
+      throw new HiveException(e);
+    }
+  }
+
+  public static final class MissingBucketsContext {
+    public final TableDesc tableInfo;
+    public final int numBuckets;
+    public final boolean isCompressed;
+    public MissingBucketsContext(TableDesc tableInfo, int numBuckets, boolean 
isCompressed) {
+      this.tableInfo = tableInfo;
+      this.numBuckets = numBuckets;
+      this.isCompressed = isCompressed;
+    }
+  }
+
+  public static void handleMmTableFinalPath(Path specPath, String unionSuffix, 
Configuration hconf,
+      boolean success, int dpLevels, int lbLevels, MissingBucketsContext mbc, 
long mmWriteId,
+      Reporter reporter) throws IOException, HiveException {
+    FileSystem fs = specPath.getFileSystem(hconf);
+    // Manifests would be at the root level, but the results at target level.
+    // TODO# special case - doesn't take bucketing into account
+    Path manifestDir = getManifestDir(specPath, unionSuffix);
+
+    ValidWriteIds.IdPathFilter filter = new 
ValidWriteIds.IdPathFilter(mmWriteId, true);
+    if (!success) {
+      tryDeleteAllMmFiles(fs, specPath, manifestDir, dpLevels, lbLevels, 
unionSuffix, filter);
+      return;
+    }
+    FileStatus[] files = HiveStatsUtils.getFileStatusRecurse(manifestDir, 1, 
fs, filter);
+    Utilities.LOG14535.info("Looking for manifests in: " + manifestDir + " (" 
+ mmWriteId + ")");
+    List<Path> manifests = new ArrayList<>();
+    if (files != null) {
+      for (FileStatus status : files) {
+        Path path = status.getPath();
+        if (path.getName().endsWith(MANIFEST_EXTENSION)) {
+          Utilities.LOG14535.info("Reading manifest " + path);
+          manifests.add(path);
+        }
+      }
+    }
+
+    Utilities.LOG14535.info("Looking for files in: " + specPath);
+    files = getMmDirectoryCandidates(fs, specPath, dpLevels, lbLevels, 
unionSuffix, filter);
+    ArrayList<FileStatus> results = new ArrayList<>();
+    if (files != null) {
+      for (FileStatus status : files) {
+        Path path = status.getPath();
+        Utilities.LOG14535.info("Looking at path: " + path);
+        if (!status.isDirectory()) {
+          if (!path.getName().endsWith(MANIFEST_EXTENSION)) {
+            Utilities.LOG14535.warn("Unknown file found, deleting: " + path);
+            tryDelete(fs, path);
+          }
+        } else {
+          results.add(status);
+        }
+      }
+    }
+
+    HashSet<String> committed = new HashSet<>();
+    for (Path mfp : manifests) {
+      try (FSDataInputStream mdis = fs.open(mfp)) {
+        int fileCount = mdis.readInt();
+        for (int i = 0; i < fileCount; ++i) {
+          String nextFile = mdis.readUTF();
+          if (!committed.add(nextFile)) {
+            throw new HiveException(nextFile + " was specified in multiple 
manifests");
+          }
+        }
+      }
+    }
+
+    for (FileStatus status : results) {
+      for (FileStatus child : fs.listStatus(status.getPath())) {
+        Path childPath = child.getPath();
+        if (committed.remove(childPath.toString())) continue; // A good file.
+        Utilities.LOG14535.info("Deleting " + childPath + " that was not 
committed");
+        // We should actually succeed here - if we fail, don't commit the 
query.
+        if (!fs.delete(childPath, true)) {
+          throw new HiveException("Failed to delete an uncommitted path " + 
childPath);
+        }
+      }
+    }
+
+    if (!committed.isEmpty()) {
+      throw new HiveException("The following files were committed but not 
found: " + committed);
+    }
+    for (Path mfp : manifests) {
+      Utilities.LOG14535.info("Deleting manifest " + mfp);
+      tryDelete(fs, mfp);
+    }
+    // Delete the manifest directory if we only created it for manifests; 
otherwise the
+    // dynamic partition loader will find it and try to load it as a 
partition... what a mess.
+    if (manifestDir != specPath) {
+      FileStatus[] remainingFiles = fs.listStatus(manifestDir);
+      if (remainingFiles == null || remainingFiles.length == 0) {
+        Utilities.LOG14535.info("Deleting directory " + manifestDir);
+        tryDelete(fs, manifestDir);
+      }
+    }
+
+    if (results.isEmpty()) return;
+
+    // TODO: see HIVE-14886 - removeTempOrDuplicateFiles is broken for list 
bucketing,
+    //       so maintain parity here by not calling it at all.
+    if (lbLevels != 0) return;
+    FileStatus[] finalResults = results.toArray(new 
FileStatus[results.size()]);
+    List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(
+        fs, finalResults, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf);
+    // create empty buckets if necessary
+    if (emptyBuckets.size() > 0) {
+      assert mbc != null;
+      Utilities.createEmptyBuckets(hconf, emptyBuckets, mbc.isCompressed, 
mbc.tableInfo, reporter);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
index bd537cd..d013c6f 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
@@ -234,6 +234,7 @@ public class ColumnTruncateMapper extends MapReduceBase 
implements
       ) throws HiveException, IOException {
     FileSystem fs = outputPath.getFileSystem(job);
     Path backupPath = backupOutputPath(fs, outputPath, job);
+    // TODO# special case - what is this about?
     Utilities.mvFileToFinalPath(outputPath, job, success, LOG, dynPartCtx, 
null,
       reporter);
     fs.delete(backupPath, true);

http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index e66948f..9a1c1fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1842,6 +1842,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
         if (!s.isDirectory()) {
           throw new HiveException("partition " + s.getPath() + " is not a 
directory!");
         }
+        Utilities.LOG14535.info("Found DP " + s.getPath());
         validPartitions.add(s.getPath());
       }
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 675bfd0..79ef4d0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -93,6 +93,7 @@ import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.FileMergeDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.FilterDesc.SampleDesc;
+import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
 import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
@@ -1256,23 +1257,28 @@ public final class GenMapRedUtils {
    List<Task<MoveWork>> mvTasks, HiveConf conf,
    Task<? extends Serializable> currTask) throws SemanticException {
 
+
     //
     // 1. create the operator tree
     //
     FileSinkDesc fsInputDesc = fsInput.getConf();
+    Utilities.LOG14535.info("Creating merge work from " + 
System.identityHashCode(fsInput)
+        + " with write ID " + (fsInputDesc.isMmTable() ? 
fsInputDesc.getMmWriteId() : null) + " into " + finalName);
 
     // Create a TableScan operator
     RowSchema inputRS = fsInput.getSchema();
     TableScanOperator tsMerge = 
GenMapRedUtils.createTemporaryTableScanOperator(
         fsInput.getCompilationOpContext(), inputRS);
 
+    Long srcMmWriteId = fsInputDesc.isMmTable() ? fsInputDesc.getMmWriteId() : 
null;
+
     // Create a FileSink operator
     TableDesc ts = (TableDesc) fsInputDesc.getTableInfo().clone();
-    // TODO# special case #N - merge FS is created here
-    FileSinkDesc fsOutputDesc = new FileSinkDesc(finalName, ts,
-      conf.getBoolVar(ConfVars.COMPRESSRESULT));
-    FileSinkOperator fsOutput = (FileSinkOperator) 
OperatorFactory.getAndMakeChild(
-      fsOutputDesc, inputRS, tsMerge);
+    FileSinkDesc fsOutputDesc = new FileSinkDesc(
+        finalName, ts, conf.getBoolVar(ConfVars.COMPRESSRESULT));
+    fsOutputDesc.setMmWriteId(srcMmWriteId);
+    // Create and attach the filesink for the merge. We don't actually need it 
for anything here.
+    OperatorFactory.getAndMakeChild(fsOutputDesc, inputRS, tsMerge);
 
     // If the input FileSinkOperator is a dynamic partition enabled, the 
tsMerge input schema
     // needs to include the partition column, and the fsOutput should have
@@ -1305,9 +1311,7 @@ public final class GenMapRedUtils {
     //
     // 2. Constructing a conditional task consisting of a move task and a map 
reduce task
     //
-    // TODO# movetask is created here; handle MM tables
-    MoveWork dummyMv = new MoveWork(null, null, null,
-         new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, 
null, null), false);
+    Path inputDirName = fsInputDesc.getMergeInputDirName();
     MapWork cplan;
     Serializable work;
 
@@ -1348,8 +1352,15 @@ public final class GenMapRedUtils {
     
cplan.setInputformat("org.apache.hadoop.hive.ql.io.CombineHiveInputFormat");
     // NOTE: we should gather stats in MR1 rather than MR2 at merge job since 
we don't
     // know if merge MR2 will be triggered at execution time
+    MoveWork dummyMv = null;
+    if (srcMmWriteId == null) {
+      // Only create the movework for non-MM table. No action needed for a MM 
table.
+      Utilities.LOG14535.info("creating dummy movetask for merge (with lfd)");
+      dummyMv = new MoveWork(null, null, null,
+         new LoadFileDesc(inputDirName, finalName, true, null, null), false);
+    }
     ConditionalTask cndTsk = GenMapRedUtils.createCondTask(conf, currTask, 
dummyMv, work,
-        fsInputDesc.getFinalDirName().toString());
+        fsInputDesc.getMergeInputDirName().toString());
 
     // keep the dynamic partition context in conditional task resolver context
     ConditionalResolverMergeFilesCtx mrCtx =
@@ -1360,7 +1371,13 @@ public final class GenMapRedUtils {
     //
     // 3. add the moveTask as the children of the conditional task
     //
-    linkMoveTask(fsOutput, cndTsk, mvTasks, conf, dependencyTask);
+    // Use the original fsOp path here in case of MM - while the new FSOP 
merges files inside the
+    // MM directory, the original MoveTask still commits based on the parent. 
Note that this path
+    // can only be triggered for a merge that's part of insert for now; MM 
tables do not support
+    // concatenate. Keeping the old logic for non-MM tables with temp 
directories and stuff.
+    Path fsopPath = srcMmWriteId != null
+        ? fsInputDesc.getFinalDirName() : fsOutputDesc.getFinalDirName();
+    linkMoveTask(fsopPath, cndTsk, mvTasks, conf, dependencyTask);
   }
 
   /**
@@ -1373,11 +1390,11 @@ public final class GenMapRedUtils {
    * @param hconf
    * @param dependencyTask
    */
-  public static void linkMoveTask(FileSinkOperator newOutput,
+  private static void linkMoveTask(Path fsopPath,
       ConditionalTask cndTsk, List<Task<MoveWork>> mvTasks, HiveConf hconf,
       DependencyCollectionTask dependencyTask) {
 
-    Task<MoveWork> mvTask = GenMapRedUtils.findMoveTask(mvTasks, newOutput);
+    Task<MoveWork> mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(mvTasks, 
fsopPath);
 
     for (Task<? extends Serializable> tsk : cndTsk.getListTasks()) {
       linkMoveTask(mvTask, tsk, hconf, dependencyTask);
@@ -1392,7 +1409,7 @@ public final class GenMapRedUtils {
    * @param hconf
    * @param dependencyTask
    */
-  public static void linkMoveTask(Task<MoveWork> mvTask,
+  private static void linkMoveTask(Task<MoveWork> mvTask,
       Task<? extends Serializable> task, HiveConf hconf,
       DependencyCollectionTask dependencyTask) {
 
@@ -1527,10 +1544,11 @@ public final class GenMapRedUtils {
     TableScanOperator topOp,  FileSinkDesc fsDesc) {
 
     ArrayList<String> aliases = new ArrayList<String>();
-    Path inputDir = fsDesc.getFinalDirName();
+    Path inputDir = fsDesc.getMergeInputDirName();
     TableDesc tblDesc = fsDesc.getTableInfo();
     aliases.add(inputDir.toString()); // dummy alias: just use the input path
 
+    Utilities.LOG14535.info("createMRWorkForMergingFiles for " + inputDir);
     // constructing the default MapredWork
     MapredWork cMrPlan = GenMapRedUtils.getMapRedWorkFromConf(conf);
     MapWork cplan = cMrPlan.getMapWork();
@@ -1555,8 +1573,9 @@ public final class GenMapRedUtils {
    */
   public static MapWork createMergeTask(FileSinkDesc fsInputDesc, Path 
finalName,
       boolean hasDynamicPartitions, CompilationOpContext ctx) throws 
SemanticException {
+    
+    Path inputDir = fsInputDesc.getMergeInputDirName();
 
-    Path inputDir = fsInputDesc.getFinalDirName();
     TableDesc tblDesc = fsInputDesc.getTableInfo();
 
     List<Path> inputDirs = new ArrayList<Path>(1);
@@ -1580,6 +1599,7 @@ public final class GenMapRedUtils {
           + " format other than RCFile or ORCFile");
     }
 
+    Utilities.LOG14535.info("creating mergefilework from " + inputDirs + " to 
" + finalName);
     // create the merge file work
     MergeFileWork work = new MergeFileWork(inputDirs, finalName,
         hasDynamicPartitions, tblDesc.getInputFileFormatClass().getName());
@@ -1602,6 +1622,7 @@ public final class GenMapRedUtils {
     } else {
       fmd = new OrcFileMergeDesc();
     }
+    fmd.setMmWriteId(fsInputDesc.getMmWriteId());
     fmd.setDpCtx(fsInputDesc.getDynPartCtx());
     fmd.setOutputPath(finalName);
     fmd.setHasDynamicPartitions(work.hasDynamicPartitions());
@@ -1635,6 +1656,7 @@ public final class GenMapRedUtils {
   public static ConditionalTask createCondTask(HiveConf conf,
       Task<? extends Serializable> currTask, MoveWork mvWork,
       Serializable mergeWork, String inputPath) {
+    Utilities.LOG14535.info("Creating conditional merge task for " + 
inputPath);
 
     // There are 3 options for this ConditionalTask:
     // 1) Merge the partitions
@@ -1642,10 +1664,14 @@ public final class GenMapRedUtils {
     // 3) Merge some partitions and move other partitions (i.e. merge some 
partitions and don't
     // merge others) in this case the merge is done first followed by the move 
to prevent
     // conflicts.
+    // TODO: if we are not dealing with concatenate DDL, we should not create 
a merge+move path
+    //       because it should be impossible to get incompatible outputs.
+    // Create a dummy task if no move is needed.
+    Serializable moveWork = mvWork != null ? mvWork : new 
DependencyCollectionWork();
     Task<? extends Serializable> mergeOnlyMergeTask = 
TaskFactory.get(mergeWork, conf);
-    Task<? extends Serializable> moveOnlyMoveTask = TaskFactory.get(mvWork, 
conf);
+    Task<? extends Serializable> moveOnlyMoveTask = TaskFactory.get(moveWork, 
conf);
     Task<? extends Serializable> mergeAndMoveMergeTask = 
TaskFactory.get(mergeWork, conf);
-    Task<? extends Serializable> mergeAndMoveMoveTask = 
TaskFactory.get(mvWork, conf);
+    Task<? extends Serializable> mergeAndMoveMoveTask = 
TaskFactory.get(moveWork, conf);
 
     // NOTE! It is necessary merge task is the parent of the move task, and not
     // the other way around, for the proper execution of the execute method of
@@ -1653,7 +1679,7 @@ public final class GenMapRedUtils {
     mergeAndMoveMergeTask.addDependentTask(mergeAndMoveMoveTask);
 
     List<Serializable> listWorks = new ArrayList<Serializable>();
-    listWorks.add(mvWork);
+    listWorks.add(moveWork);
     listWorks.add(mergeWork);
 
     ConditionalWork cndWork = new ConditionalWork(listWorks);
@@ -1689,8 +1715,8 @@ public final class GenMapRedUtils {
         .isSkewedStoredAsDir();
   }
 
-  public static Task<MoveWork> findMoveTask(
-      List<Task<MoveWork>> mvTasks, FileSinkOperator fsOp) {
+  public static Task<MoveWork> findMoveTaskForFsopOutput(
+      List<Task<MoveWork>> mvTasks, Path fsopFinalDir) {
     // find the move task
     for (Task<MoveWork> mvTsk : mvTasks) {
       MoveWork mvWork = mvTsk.getWork();
@@ -1700,9 +1726,10 @@ public final class GenMapRedUtils {
       } else if (mvWork.getLoadTableWork() != null) {
         srcDir = mvWork.getLoadTableWork().getSourcePath();
       }
+      Utilities.LOG14535.info("Observing MoveWork " + 
System.identityHashCode(mvWork)
+          + " with " + srcDir + " while looking for " + fsopFinalDir);
 
-      if ((srcDir != null)
-          && (srcDir.equals(fsOp.getConf().getFinalDirName()))) {
+      if ((srcDir != null) && srcDir.equals(fsopFinalDir)) {
         return mvTsk;
       }
     }
@@ -1722,59 +1749,58 @@ public final class GenMapRedUtils {
       Task<? extends Serializable> currTask, boolean isInsertTable) {
 
     // Has the user enabled merging of files for map-only jobs or for all jobs
-    if ((mvTasks != null) && (!mvTasks.isEmpty())) {
-
-      // no need of merging if the move is to a local file system
-      MoveTask mvTask = (MoveTask) GenMapRedUtils.findMoveTask(mvTasks, fsOp);
-
-      if (mvTask != null && isInsertTable && 
hconf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)
-          && !fsOp.getConf().isMaterialization()) {
-        // mark the MapredWork and FileSinkOperator for gathering stats
-        fsOp.getConf().setGatherStats(true);
-        
fsOp.getConf().setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE));
-        if (!mvTask.hasFollowingStatsTask()) {
-          GenMapRedUtils.addStatsTask(fsOp, mvTask, currTask, hconf);
-        }
+    if (mvTasks == null  || mvTasks.isEmpty()) return false;
+
+    // no need of merging if the move is to a local file system
+    // We are looking based on the original FSOP, so use the original path as 
is.
+    MoveTask mvTask = (MoveTask) GenMapRedUtils.findMoveTaskForFsopOutput(
+        mvTasks, fsOp.getConf().getFinalDirName());
+
+    // TODO: wtf? wtf?!! why is this in this method?
+    if (mvTask != null && isInsertTable && 
hconf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)
+        && !fsOp.getConf().isMaterialization()) {
+      // mark the MapredWork and FileSinkOperator for gathering stats
+      fsOp.getConf().setGatherStats(true);
+      
fsOp.getConf().setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE));
+      if (!mvTask.hasFollowingStatsTask()) {
+        GenMapRedUtils.addStatsTask(fsOp, mvTask, currTask, hconf);
       }
+    }
 
-      if ((mvTask != null) && !mvTask.isLocal() && 
fsOp.getConf().canBeMerged()) {
+    if (mvTask == null || mvTask.isLocal() || !fsOp.getConf().canBeMerged()) 
return false;
 
-        if (currTask.getWork() instanceof TezWork) {
-          // tez blurs the boundary between map and reduce, thus it has it's 
own
-          // config
-          return hconf.getBoolVar(ConfVars.HIVEMERGETEZFILES);
-        } else if (currTask.getWork() instanceof SparkWork) {
-          // spark has its own config for merging
-          return hconf.getBoolVar(ConfVars.HIVEMERGESPARKFILES);
-        }
+    if (currTask.getWork() instanceof TezWork) {
+      // tez blurs the boundary between map and reduce, thus it has it's own 
config
+      return hconf.getBoolVar(ConfVars.HIVEMERGETEZFILES);
+    } else if (currTask.getWork() instanceof SparkWork) {
+      // spark has its own config for merging
+      return hconf.getBoolVar(ConfVars.HIVEMERGESPARKFILES);
+    }
+    return isMergeRequiredForMr(hconf, fsOp, currTask);
+  }
 
-        if (fsOp.getConf().isLinkedFileSink()) {
-          // If the user has HIVEMERGEMAPREDFILES set to false, the idea was 
the
-          // number of reducers are few, so the number of files anyway are 
small.
-          // However, with this optimization, we are increasing the number of 
files
-          // possibly by a big margin. So, merge aggresively.
-          if (hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) ||
-              hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES)) {
-            return true;
-          }
-        } else {
-          // There are separate configuration parameters to control whether to
-          // merge for a map-only job
-          // or for a map-reduce job
-          if (currTask.getWork() instanceof MapredWork) {
-            ReduceWork reduceWork = ((MapredWork) 
currTask.getWork()).getReduceWork();
-            boolean mergeMapOnly =
-              hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && reduceWork == 
null;
-            boolean mergeMapRed =
-              hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES) &&
-              reduceWork != null;
-            if (mergeMapOnly || mergeMapRed) {
-              return true;
-            }
-          } else {
-            return false;
-          }
-        }
+  private static boolean isMergeRequiredForMr(HiveConf hconf,
+      FileSinkOperator fsOp, Task<? extends Serializable> currTask) {
+    if (fsOp.getConf().isLinkedFileSink()) {
+      // If the user has HIVEMERGEMAPREDFILES set to false, the idea was the
+      // number of reducers are few, so the number of files anyway are small.
+      // However, with this optimization, we are increasing the number of files
+      // possibly by a big margin. So, merge aggresively.
+      return (hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) ||
+          hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES));
+    }
+    // There are separate configuration parameters to control whether to
+    // merge for a map-only job
+    // or for a map-reduce job
+    if (currTask.getWork() instanceof MapredWork) {
+      ReduceWork reduceWork = ((MapredWork) 
currTask.getWork()).getReduceWork();
+      boolean mergeMapOnly =
+        hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && reduceWork == null;
+      boolean mergeMapRed =
+        hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES) &&
+        reduceWork != null;
+      if (mergeMapOnly || mergeMapRed) {
+        return true;
       }
     }
     return false;
@@ -1798,36 +1824,38 @@ public final class GenMapRedUtils {
 
     Path dest = null;
 
+    FileSinkDesc fileSinkDesc = fsOp.getConf();
+    boolean isMmTable = fileSinkDesc.isMmTable();
     if (chDir) {
-      FileSinkDesc fileSinkDesc = fsOp.getConf();
-      dest = fileSinkDesc.getFinalDirName();
-
-      // generate the temporary file
-      // it must be on the same file system as the current destination
-      Context baseCtx = parseCtx.getContext();
-
-      // Create the required temporary file in the HDFS location if the 
destination
-      // path of the FileSinkOperator table is a blobstore path.
-      // TODO# special case #N - linked FDs (unions?)
-      Path tmpDir = baseCtx.getTempDirForPath(fileSinkDesc.getDestPath());
-
-      // Change all the linked file sink descriptors
-      if (fileSinkDesc.isLinkedFileSink()) {
-        for (FileSinkDesc fsConf:fileSinkDesc.getLinkedFileSinkDesc()) {
-          fsConf.setParentDir(tmpDir);
-          fsConf.setDirName(new Path(tmpDir, fsConf.getDirName().getName()));
-          Utilities.LOG14535.info("createMoveTask setting tmpDir for 
LinkedFileSink chDir " + fsConf.getDirName() + "; new parent " + tmpDir + ", 
dest was " + fileSinkDesc.getDestPath());
+
+      dest = fileSinkDesc.getMergeInputDirName();
+      if (!isMmTable) {
+        // generate the temporary file
+        // it must be on the same file system as the current destination
+        Context baseCtx = parseCtx.getContext();
+
+        // Create the required temporary file in the HDFS location if the 
destination
+        // path of the FileSinkOperator table is a blobstore path.
+        Path tmpDir = baseCtx.getTempDirForPath(fileSinkDesc.getDestPath());
+
+        // Change all the linked file sink descriptors
+        if (fileSinkDesc.isLinkedFileSink()) {
+          for (FileSinkDesc fsConf:fileSinkDesc.getLinkedFileSinkDesc()) {
+            fsConf.setParentDir(tmpDir);
+            fsConf.setDirName(new Path(tmpDir, fsConf.getDirName().getName()));
+            Utilities.LOG14535.info("createMoveTask setting tmpDir for 
LinkedFileSink chDir " + fsConf.getDirName() + "; new parent " + tmpDir + ", 
dest was " + fileSinkDesc.getDestPath());
+          }
+        } else {
+          fileSinkDesc.setDirName(tmpDir);
+          Utilities.LOG14535.info("createMoveTask setting tmpDir  chDir " + 
tmpDir + "; dest was " + fileSinkDesc.getDestPath());
         }
-      } else {
-        fileSinkDesc.setDirName(tmpDir);
-        Utilities.LOG14535.info("createMoveTask setting tmpDir for 
LinkedFileSink chDir " + tmpDir + "; dest was " + fileSinkDesc.getDestPath());
       }
     }
 
     Task<MoveWork> mvTask = null;
 
     if (!chDir) {
-      mvTask = GenMapRedUtils.findMoveTask(mvTasks, fsOp);
+      mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(mvTasks, 
fsOp.getConf().getFinalDirName());
     }
 
     // Set the move task to be dependent on the current task

http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index 5348500..03c2e79 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -1586,6 +1586,10 @@ public class DDLSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
     try {
       tblObj = getTable(tableName);
+      // TODO: we should probably block all ACID tables here.
+      if (MetaStoreUtils.isMmTable(tblObj.getParameters())) {
+        throw new SemanticException("Merge is not supported for MM tables");
+      }
 
       List<String> bucketCols = null;
       Class<? extends InputFormat> inputFormatClass = null;
@@ -1676,9 +1680,8 @@ public class DDLSemanticAnalyzer extends 
BaseSemanticAnalyzer {
       LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc,
           partSpec == null ? new HashMap<String, String>() : partSpec);
       ltd.setLbCtx(lbCtx);
-      // TODO# movetask is created here; handle MM tables
-      Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, 
null, false),
-          conf);
+      // No need to handle MM tables - unsupported path.
+      Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, 
null, false), conf);
       mergeTask.addDependentTask(moveTsk);
 
       if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {

http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index 73cc95a..5c67fe2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -304,7 +304,6 @@ public class GenTezUtils {
         linked = context.linkedFileSinks.get(path);
         linked.add(desc);
 
-        // TODO# special case #N - unions (tez)
         desc.setDirName(new Path(path, "" + linked.size()));
         Utilities.LOG14535.info("removing union - new desc with " + 
desc.getDirName() + "; parent " + path);
         desc.setLinkedFileSink(true);
@@ -374,8 +373,7 @@ public class GenTezUtils {
       // If underlying data is RCFile or OrcFile, RCFileBlockMerge task or
       // OrcFileStripeMerge task would be created.
       LOG.info("using CombineHiveInputformat for the merge job");
-      Utilities.LOG14535.info("merging files from " + 
fileSink.getConf().getDirName() + " to " + finalName);
-      // TODO# special case #N - merge
+      Utilities.LOG14535.info("will generate MR work for merging files from " 
+ fileSink.getConf().getDirName() + " to " + finalName);
       GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName,
           context.dependencyTask, context.moveTask,
           hconf, context.currentTask);

http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index ede1bda..66e2d27 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -6575,7 +6575,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
       } else {
         queryTmpdir = ctx.getTempDirForPath(dest_path);
       }
-      Utilities.LOG14535.info("createFS for table specifying " + queryTmpdir + 
" from " + dest_path);
+      Utilities.LOG14535.info("create filesink w/DEST_TABLE specifying " + 
queryTmpdir + " from " + dest_path);
       if (dpCtx != null) {
         // set the root of the temporary path where dynamic partition columns 
will populate
         dpCtx.setRootPath(queryTmpdir);
@@ -6644,7 +6644,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
       isMmTable = MetaStoreUtils.isMmTable(dest_tab.getParameters());
       queryTmpdir = isMmTable ? dest_path : ctx.getTempDirForPath(dest_path);
-      Utilities.LOG14535.info("createFS for partition specifying " + 
queryTmpdir + " from " + dest_path);
+      Utilities.LOG14535.info("create filesink w/DEST_PARTITION specifying " + 
queryTmpdir + " from " + dest_path);
       table_desc = Utilities.getTableDesc(dest_tab);
 
       // Add sorting/bucketing if needed

http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
index ffc9c3e..4635f18 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
@@ -76,14 +76,6 @@ public class ConditionalResolverMergeFiles implements 
ConditionalResolver,
     }
 
     /**
-     * @param dir
-     *          the dir to set
-     */
-    public void setDir(String dir) {
-      this.dir = dir;
-    }
-
-    /**
      * @return the listTasks
      */
     public List<Task<? extends Serializable>> getListTasks() {
@@ -121,8 +113,7 @@ public class ConditionalResolverMergeFiles implements 
ConditionalResolver,
     }
   }
 
-  public List<Task<? extends Serializable>> getTasks(HiveConf conf,
-      Object objCtx) {
+  public List<Task<? extends Serializable>> getTasks(HiveConf conf, Object 
objCtx) {
     ConditionalResolverMergeFilesCtx ctx = (ConditionalResolverMergeFilesCtx) 
objCtx;
     String dirName = ctx.getDir();
 
@@ -179,6 +170,8 @@ public class ConditionalResolverMergeFiles implements 
ConditionalResolver,
           if(lbLevel == 0) {
             // static partition without list bucketing
             long totalSz = getMergeSize(inpFs, dirPath, avgConditionSize);
+            Utilities.LOG14535.info("merge resolve simple case - totalSz " + 
totalSz + " from " + dirPath);
+
             if (totalSz >= 0) { // add the merge job
               setupMapRedWork(conf, work, trgtSize, totalSz);
               resTsks.add(mrTask);
@@ -192,6 +185,7 @@ public class ConditionalResolverMergeFiles implements 
ConditionalResolver,
           }
         }
       } else {
+        Utilities.LOG14535.info("Resolver returning movetask for " + dirPath);
         resTsks.add(mvTask);
       }
     } catch (IOException e) {
@@ -234,6 +228,7 @@ public class ConditionalResolverMergeFiles implements 
ConditionalResolver,
       Task<? extends Serializable> mrTask, Task<? extends Serializable> 
mrAndMvTask, Path dirPath,
       FileSystem inpFs, ConditionalResolverMergeFilesCtx ctx, MapWork work, 
int dpLbLevel)
       throws IOException {
+    Utilities.LOG14535.info("generateActualTasks for " + dirPath);
     DynamicPartitionCtx dpCtx = ctx.getDPCtx();
     // get list of dynamic partitions
     FileStatus[] status = HiveStatsUtils.getFileStatusRecurse(dirPath, 
dpLbLevel, inpFs);
@@ -281,6 +276,7 @@ public class ConditionalResolverMergeFiles implements 
ConditionalResolver,
 
       // add the move task for those partitions that do not need merging
       if (toMove.size() > 0) {
+        // Note: this path should be specific to concatenate; never executed 
in a select query.
         // modify the existing move task as it is already in the candidate 
running tasks
 
         // running the MoveTask and MR task in parallel may
@@ -362,6 +358,7 @@ public class ConditionalResolverMergeFiles implements 
ConditionalResolver,
       long totalSz = 0;
       int numFiles = 0;
       for (FileStatus fStat : fStats) {
+        Utilities.LOG14535.info("Resolver looking at " + fStat.getPath());
         if (fStat.isDir()) {
           AverageSize avgSzDir = getAverageSize(inpFs, fStat.getPath());
           if (avgSzDir.getTotalSize() < 0) {

http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java
index 7ec1bdd..615c63d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java
@@ -28,6 +28,7 @@ public class FileMergeDesc extends AbstractOperatorDesc {
   private int listBucketingDepth;
   private boolean hasDynamicPartitions;
   private boolean isListBucketingAlterTableConcatenate;
+  private Long mmWriteId;
 
   public FileMergeDesc(DynamicPartitionCtx dynPartCtx, Path outputDir) {
     this.dpCtx = dynPartCtx;
@@ -73,4 +74,12 @@ public class FileMergeDesc extends AbstractOperatorDesc {
   public void setListBucketingAlterTableConcatenate(boolean 
isListBucketingAlterTableConcatenate) {
     this.isListBucketingAlterTableConcatenate = 
isListBucketingAlterTableConcatenate;
   }
+
+  public Long getMmWriteId() {
+    return mmWriteId;
+  }
+
+  public void setMmWriteId(Long mmWriteId) {
+    this.mmWriteId = mmWriteId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index def1c5f..8bef7a9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidWriteIds;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
@@ -182,6 +183,13 @@ public class FileSinkDesc extends AbstractOperatorDesc {
     return linkedFileSink ? parentDir : dirName;
   }
 
+  /** getFinalDirName that takes into account MM, but not DP, LB or buckets. */
+  public Path getMergeInputDirName() {
+    Path root = getFinalDirName();
+    if (mmWriteId == null) return root;
+    return new Path(root, ValidWriteIds.getMmFilePrefix(mmWriteId));
+  }
+
   @Explain(displayName = "table", explainLevels = { Level.USER, Level.DEFAULT, 
Level.EXTENDED })
   public TableDesc getTableInfo() {
     return tableInfo;
@@ -255,7 +263,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
     return mmWriteId != null;
   }
 
-  public long getMmWriteId() {
+  public Long getMmWriteId() {
     return mmWriteId;
   }
 
@@ -485,6 +493,10 @@ public class FileSinkDesc extends AbstractOperatorDesc {
     this.statsTmpDir = statsCollectionTempDir;
   }
 
+  public void setMmWriteId(Long mmWriteId) {
+    this.mmWriteId = mmWriteId;
+  }
+
   public class FileSinkOperatorExplainVectorization extends 
OperatorExplainVectorization {
 
     public FileSinkOperatorExplainVectorization(VectorDesc vectorDesc) {

http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
index 9f498c7..f0b2775 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
 import java.util.List;
 
 import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
@@ -59,7 +60,7 @@ public class MoveWork implements Serializable {
   public MoveWork() {
   }
 
-  public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs) {
+  private MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs) {
     this.inputs = inputs;
     this.outputs = outputs;
   }
@@ -68,6 +69,8 @@ public class MoveWork implements Serializable {
       final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork,
       boolean checkFileFormat, boolean srcLocal) {
     this(inputs, outputs);
+    Utilities.LOG14535.info("Creating MoveWork " + 
System.identityHashCode(this)
+        + " with " + loadTableWork + "; " + loadFileWork);
     this.loadTableWork = loadTableWork;
     this.loadFileWork = loadFileWork;
     this.checkFileFormat = checkFileFormat;
@@ -77,10 +80,7 @@ public class MoveWork implements Serializable {
   public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs,
       final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork,
       boolean checkFileFormat) {
-    this(inputs, outputs);
-    this.loadTableWork = loadTableWork;
-    this.loadFileWork = loadFileWork;
-    this.checkFileFormat = checkFileFormat;
+    this(inputs, outputs, loadTableWork, loadFileWork, checkFileFormat, false);
   }
 
   @Explain(displayName = "tables", explainLevels = { Level.USER, 
Level.DEFAULT, Level.EXTENDED })

http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/test/queries/clientpositive/mm_all.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mm_all.q 
b/ql/src/test/queries/clientpositive/mm_all.q
index 1f85c48..8ce42a2 100644
--- a/ql/src/test/queries/clientpositive/mm_all.q
+++ b/ql/src/test/queries/clientpositive/mm_all.q
@@ -131,6 +131,42 @@ drop table skew_dp_union_mm;
 
 
 
+set hive.merge.orcfile.stripe.level=true;
+set hive.merge.tezfiles=true;
+set hive.merge.mapfiles=true;
+set hive.merge.mapredfiles=true;
+
+
+create table merge0_mm (id int) stored as orc 
tblproperties('hivecommit'='true');
+
+insert into table merge0_mm select key from intermediate;
+select * from merge0_mm;
+
+set tez.grouping.split-count=1;
+insert into table merge0_mm select key from intermediate;
+set tez.grouping.split-count=0;
+select * from merge0_mm;
+
+drop table merge0_mm;
+
+
+create table merge1_mm (id int) partitioned by (key int) stored as orc 
tblproperties('hivecommit'='true');
+
+insert into table merge1_mm partition (key) select key, key from intermediate;
+select * from merge1_mm;
+
+set tez.grouping.split-count=1;
+insert into table merge1_mm partition (key) select key, key from intermediate;
+set tez.grouping.split-count=0;
+select * from merge1_mm;
+
+drop table merge1_mm;
+
+
+-- TODO: need to include merge+union, but it's broken for now
+
+
+
 
 
 
@@ -140,31 +176,14 @@ drop table skew_dp_union_mm;
 
 
 
---drop table merge_mm;
+
+
 --drop table ctas_mm;
 --
 --
 --create table ctas_mm tblproperties ('hivecommit'='true') as select * from 
src limit 3;
 --
 --
---set hive.merge.mapredfiles=true;
---set hive.merge.sparkfiles=true;
---set hive.merge.tezfiles=true;
---
---CREATE TABLE merge_mm (key INT, value STRING) 
---    PARTITIONED BY (ds STRING, part STRING) STORED AS ORC tblproperties 
('hivecommit'='true');
---
---EXPLAIN
---INSERT OVERWRITE TABLE merge_mm PARTITION (ds='123', part)
---        SELECT key, value, PMOD(HASH(key), 2) as part
---        FROM src;
---
---INSERT OVERWRITE TABLE merge_mm PARTITION (ds='123', part)
---        SELECT key, value, PMOD(HASH(key), 2) as part
---        FROM src;
---
---
---
 ---- TODO load, multi-insert etc
 --
 --

http://git-wip-us.apache.org/repos/asf/hive/blob/eacf9f9b/ql/src/test/queries/clientpositive/mm_current.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mm_current.q 
b/ql/src/test/queries/clientpositive/mm_current.q
index ceb7a1a..f423b00 100644
--- a/ql/src/test/queries/clientpositive/mm_current.q
+++ b/ql/src/test/queries/clientpositive/mm_current.q
@@ -8,45 +8,17 @@ set hive.tez.auto.reducer.parallelism=false;
 
 drop table intermediate;
 create table intermediate(key int) partitioned by (p int) stored as orc;
-insert into table intermediate partition(p='455') select key from src limit 2;
-insert into table intermediate partition(p='456') select key from src limit 2;
+insert into table intermediate partition(p='455') select distinct key from src 
where key >= 0 order by key desc limit 2;
+insert into table intermediate partition(p='456') select distinct key from src 
where key is not null order by key asc limit 2;
 
 
-set hive.optimize.skewjoin.compiletime = true;
+set hive.merge.orcfile.stripe.level=true;
+set hive.merge.tezfiles=true;
+set hive.merge.mapfiles=true;
+set hive.merge.mapredfiles=true;
 
-create table skew_mm(k1 int, k2 int, k4 int) skewed by (k1, k4) on 
((0,0),(1,1),(2,2),(3,3))
- stored as directories tblproperties ('hivecommit'='false');
 
-insert into table skew_mm 
-select key, key, key from intermediate;
 
-drop table skew_mm;
-
-
-create table skew_mm(k1 int, k2 int, k4 int) skewed by (k1, k4) on 
((0,0),(1,1),(2,2),(3,3))
- stored as directories tblproperties ('hivecommit'='true');
-
-insert into table skew_mm 
-select key, key, key from intermediate;
-
-select * from skew_mm;
-drop table skew_mm;
-
-
-
-
-
-create table skew_mm(k1 int, k2 int, k4 int) partitioned by (k3 int) 
-skewed by (k1, k4) on ((0,0),(1,1),(2,2),(3,3)) stored as directories 
tblproperties ('hivecommit'='true');
-
-insert into table skew_mm partition (k3)
-select key as i, key as j, key as k, key as l from intermediate
-union all 
-select key +1 as i, key +2 as j, key +3 as k, key +4 as l from intermediate;
-
-
-select * from skew_mm;
-drop table skew_mm;
 
 
 

Reply via email to