HIVE-14643 : handle ctas for the MM tables (Sergey Shelukhin)

Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/af4ff378
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/af4ff378
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/af4ff378

Branch: refs/heads/hive-14535
Commit: af4ff3787d648a9f4c80b5446d6bcd80b1efc69e
Parents: 2474f06
Author: Sergey Shelukhin <ser...@apache.org>
Authored: Mon Oct 17 12:33:31 2016 -0700
Committer: Sergey Shelukhin <ser...@apache.org>
Committed: Mon Oct 17 12:33:31 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |  12 +-
 .../hadoop/hive/ql/exec/FileSinkOperator.java   |  52 ++++--
 .../apache/hadoop/hive/ql/exec/Utilities.java   |  74 +++++---
 .../apache/hadoop/hive/ql/metadata/Hive.java    |   2 +-
 .../hive/ql/optimizer/GenMapRedUtils.java       |   5 +-
 .../optimizer/unionproc/UnionProcFactory.java   |   1 -
 .../hadoop/hive/ql/parse/GenTezUtils.java       |   1 -
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  | 182 +++++++++++--------
 .../hadoop/hive/ql/parse/TaskCompiler.java      | 144 +++++++++------
 .../hadoop/hive/ql/plan/CreateTableDesc.java    |  22 +++
 .../hadoop/hive/ql/plan/FileSinkDesc.java       |  10 +-
 .../apache/hadoop/hive/ql/plan/LoadDesc.java    |   5 +-
 .../hadoop/hive/ql/plan/LoadFileDesc.java       |   2 +-
 .../apache/hadoop/hive/ql/plan/MoveWork.java    |   2 +-
 ql/src/test/queries/clientpositive/mm_all.q     |  30 ++-
 ql/src/test/queries/clientpositive/mm_current.q |  10 +-
 .../results/clientpositive/llap/mm_all.q.out    | 138 +++++++++-----
 .../clientpositive/llap/mm_current.q.out        |  42 +++++
 18 files changed, 463 insertions(+), 271 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index acf570f..bb9eaf5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -4059,8 +4059,18 @@ public class DDLTask extends Task<DDLWork> implements 
Serializable {
       } else {
         db.createTable(tbl, crtTbl.getIfNotExists());
       }
-      if ( crtTbl.isCTAS()) {
+      if (crtTbl.isCTAS()) {
         Table createdTable = db.getTable(tbl.getDbName(), tbl.getTableName());
+        if (crtTbl.getInitialWriteId() != null) {
+          // TODO# this would be retrieved via ACID before the query runs; for 
now we rely on it
+          //       being zero at start; we can't create a write ID before we 
create the table here.
+          long initialWriteId = db.getNextTableWriteId(tbl.getDbName(), 
tbl.getTableName());
+          if (initialWriteId != crtTbl.getInitialWriteId()) {
+            throw new HiveException("Initial write ID mismatch - expected "
+                + crtTbl.getInitialWriteId() + " but got " + initialWriteId);
+          }
+          db.commitMmTableWrite(tbl, initialWriteId);
+        }
         DataContainer dc = new DataContainer(createdTable.getTTable());
         SessionState.get().getLineageState().setLineage(
                 createdTable.getPath(), dc, createdTable.getCols()

http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index dda4b51..ef6473a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -112,7 +112,8 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
   protected transient Path parent;
   protected transient HiveOutputFormat<?, ?> hiveOutputFormat;
   protected transient Path specPath;
-  protected transient String childSpecPathDynLinkedPartitions;
+  protected transient String unionPath;
+  protected transient boolean isUnionDp;
   protected transient int dpStartCol; // start column # for DP columns
   protected transient List<String> dpVals; // array of values corresponding to 
DP columns
   protected transient List<Object> dpWritables;
@@ -304,7 +305,12 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
           }
           outPaths[filesIdx] = getTaskOutPath(taskId);
         } else {
-          String subdirPath = 
ValidWriteIds.getMmFilePrefix(conf.getMmWriteId()) + "/" + taskId;
+          String subdirPath = 
ValidWriteIds.getMmFilePrefix(conf.getMmWriteId());
+          if (unionPath != null) {
+            // Create the union directory inside the MM directory.
+            subdirPath += Path.SEPARATOR + unionPath;
+          }
+          subdirPath += Path.SEPARATOR + taskId;
           if (!bDynParts && !isSkewedStoredAsSubDirectories) {
             finalPaths[filesIdx] = getFinalPath(subdirPath, specPath, 
extension);
           } else {
@@ -369,7 +375,6 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
   protected boolean filesCreated = false;
 
   private void initializeSpecPath() {
-    // TODO# special case #N
     // For a query of the type:
     // insert overwrite table T1
     // select * from (subq1 union all subq2)u;
@@ -383,18 +388,25 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
     // and Parent/DynamicPartition/Child_1 respectively.
     // The movetask that follows subQ1 and subQ2 tasks still moves the 
directory
     // 'Parent'
-    if ((!conf.isLinkedFileSink()) || (dpCtx == null)) {
+    boolean isLinked = conf.isLinkedFileSink();
+    if (!isLinked) {
+      // Simple case - no union.
       specPath = conf.getDirName();
-      Utilities.LOG14535.info("Setting up FSOP " + 
System.identityHashCode(this) + " ("
-          + conf.isLinkedFileSink() + ") with " + taskId + " and " + specPath);
-      childSpecPathDynLinkedPartitions = null;
-      return;
+      unionPath = null;
+    } else {
+      isUnionDp = (dpCtx != null);
+      if (conf.isMmTable() || isUnionDp) {
+        // MM tables need custom handling for union suffix; DP tables use 
parent too.
+        specPath = conf.getParentDir();
+        unionPath = conf.getDirName().getName();
+      } else {
+        // For now, keep the old logic for non-MM non-DP union case. Should 
probably be unified.
+        specPath = conf.getDirName();
+        unionPath = null;
+      }
     }
-
-    specPath = conf.getParentDir();
-    childSpecPathDynLinkedPartitions = conf.getDirName().getName();
     Utilities.LOG14535.info("Setting up FSOP " + System.identityHashCode(this) 
+ " ("
-        + conf.isLinkedFileSink() + ") with " + taskId + " and " + specPath);
+        + conf.isLinkedFileSink() + ") with " + taskId + " and " + specPath + 
" + " + unionPath);
   }
 
   /** Kryo ctor. */
@@ -903,9 +915,9 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
    * @throws HiveException
    */
   private FSPaths createNewPaths(String dirName) throws HiveException {
-    FSPaths fsp2 = new FSPaths(specPath, conf.isMmTable()); // TODO# this will 
break
-    fsp2.configureDynPartPath(dirName, childSpecPathDynLinkedPartitions);
-    Utilities.LOG14535.info("creating new paths for " + dirName + ", childSpec 
" + childSpecPathDynLinkedPartitions
+    FSPaths fsp2 = new FSPaths(specPath, conf.isMmTable());
+    fsp2.configureDynPartPath(dirName, !conf.isMmTable() && isUnionDp ? 
unionPath : null);
+    Utilities.LOG14535.info("creating new paths for " + dirName + ", childSpec 
" + unionPath
         + ": tmpPath " + fsp2.getTmpPath() + ", task path " + 
fsp2.getTaskOutputTempPath()/*, new Exception()*/);
     if(!conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) {
       createBucketFiles(fsp2);
@@ -1129,8 +1141,8 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
         }
       }
       if (conf.getMmWriteId() != null) {
-        Utilities.writeMmCommitManifest(commitPaths, specPath, fs, taskId, 
conf.getMmWriteId(),
-            childSpecPathDynLinkedPartitions);
+        Utilities.writeMmCommitManifest(
+            commitPaths, specPath, fs, taskId, conf.getMmWriteId(), unionPath);
       }
       // Only publish stats if this operator's flag was set to gather stats
       if (conf.isGatherStats()) {
@@ -1170,16 +1182,16 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
         String unionSuffix = null;
         DynamicPartitionCtx dpCtx = conf.getDynPartCtx();
         ListBucketingCtx lbCtx = conf.getLbCtx();
-        if (conf.isLinkedFileSink() && (dpCtx != null)) {
+        if (conf.isLinkedFileSink() && (dpCtx != null || conf.isMmTable())) {
           specPath = conf.getParentDir();
-          Utilities.LOG14535.info("Setting specPath to " + specPath + " for 
dynparts");
           unionSuffix = conf.getDirName().getName();
         }
+        Utilities.LOG14535.info("jobCloseOp using specPath " + specPath);
         if (!conf.isMmTable()) {
           Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, 
conf, reporter);
         } else {
           int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(),
-              lbLevels = lbCtx.calculateListBucketingLevel();
+              lbLevels = lbCtx == null ? 0 : 
lbCtx.calculateListBucketingLevel();
           // TODO: why is it stored in both?
           int numBuckets = (conf.getTable() != null) ? 
conf.getTable().getNumBuckets()
               : (dpCtx != null ? dpCtx.getNumBuckets() : 0);

http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 49bdd84..accb237 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -3773,26 +3773,23 @@ public final class Utilities {
     }
   }
 
-  private static FileStatus[] getMmDirectoryCandidates(FileSystem fs, Path 
path,
-      int dpLevels, int lbLevels, String unionSuffix, PathFilter filter) 
throws IOException {
+  private static FileStatus[] getMmDirectoryCandidates(FileSystem fs, Path 
path, int dpLevels,
+      int lbLevels, PathFilter filter, long mmWriteId) throws IOException {
     StringBuilder sb = new StringBuilder(path.toUri().getPath());
     for (int i = 0; i < dpLevels + lbLevels; i++) {
       sb.append(Path.SEPARATOR).append("*");
     }
-    if (unionSuffix != null) {
-      sb.append(Path.SEPARATOR).append(unionSuffix);
-    }
-    sb.append(Path.SEPARATOR).append("*"); // TODO: we could add exact mm 
prefix here
+    sb.append(Path.SEPARATOR).append(ValidWriteIds.getMmFilePrefix(mmWriteId));
     Utilities.LOG14535.info("Looking for files via: " + sb.toString());
     Path pathPattern = new Path(path, sb.toString());
     return fs.globStatus(pathPattern, filter);
   }
 
   private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path 
manifestDir,
-      int dpLevels, int lbLevels, String unionSuffix, 
ValidWriteIds.IdPathFilter filter)
-          throws IOException {
+      int dpLevels, int lbLevels, String unionSuffix, 
ValidWriteIds.IdPathFilter filter,
+      long mmWriteId) throws IOException {
     FileStatus[] files = getMmDirectoryCandidates(
-        fs, specPath, dpLevels, lbLevels, unionSuffix, filter);
+        fs, specPath, dpLevels, lbLevels, filter, mmWriteId);
     if (files != null) {
       for (FileStatus status : files) {
         Utilities.LOG14535.info("Deleting " + status.getPath() + " on 
failure");
@@ -3854,7 +3851,8 @@ public final class Utilities {
 
     ValidWriteIds.IdPathFilter filter = new 
ValidWriteIds.IdPathFilter(mmWriteId, true);
     if (!success) {
-      tryDeleteAllMmFiles(fs, specPath, manifestDir, dpLevels, lbLevels, 
unionSuffix, filter);
+      tryDeleteAllMmFiles(fs, specPath, manifestDir, dpLevels, lbLevels,
+          unionSuffix, filter, mmWriteId);
       return;
     }
     FileStatus[] files = HiveStatsUtils.getFileStatusRecurse(manifestDir, 1, 
fs, filter);
@@ -3871,8 +3869,9 @@ public final class Utilities {
     }
 
     Utilities.LOG14535.info("Looking for files in: " + specPath);
-    files = getMmDirectoryCandidates(fs, specPath, dpLevels, lbLevels, 
unionSuffix, filter);
-    ArrayList<FileStatus> results = new ArrayList<>();
+    files = getMmDirectoryCandidates(
+        fs, specPath, dpLevels, lbLevels, filter, mmWriteId);
+    ArrayList<FileStatus> mmDirectories = new ArrayList<>();
     if (files != null) {
       for (FileStatus status : files) {
         Path path = status.getPath();
@@ -3883,7 +3882,7 @@ public final class Utilities {
             tryDelete(fs, path);
           }
         } else {
-          results.add(status);
+          mmDirectories.add(status);
         }
       }
     }
@@ -3901,16 +3900,8 @@ public final class Utilities {
       }
     }
 
-    for (FileStatus status : results) {
-      for (FileStatus child : fs.listStatus(status.getPath())) {
-        Path childPath = child.getPath();
-        if (committed.remove(childPath.toString())) continue; // A good file.
-        Utilities.LOG14535.info("Deleting " + childPath + " that was not 
committed");
-        // We should actually succeed here - if we fail, don't commit the 
query.
-        if (!fs.delete(childPath, true)) {
-          throw new HiveException("Failed to delete an uncommitted path " + 
childPath);
-        }
-      }
+    for (FileStatus status : mmDirectories) {
+      cleanMmDirectory(status.getPath(), fs, unionSuffix, committed);
     }
 
     if (!committed.isEmpty()) {
@@ -3930,12 +3921,12 @@ public final class Utilities {
       }
     }
 
-    if (results.isEmpty()) return;
+    if (mmDirectories.isEmpty()) return;
 
     // TODO: see HIVE-14886 - removeTempOrDuplicateFiles is broken for list 
bucketing,
     //       so maintain parity here by not calling it at all.
     if (lbLevels != 0) return;
-    FileStatus[] finalResults = results.toArray(new 
FileStatus[results.size()]);
+    FileStatus[] finalResults = mmDirectories.toArray(new 
FileStatus[mmDirectories.size()]);
     List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(
         fs, finalResults, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf);
     // create empty buckets if necessary
@@ -3945,4 +3936,37 @@ public final class Utilities {
     }
   }
 
+  private static void cleanMmDirectory(Path dir, FileSystem fs,
+      String unionSuffix, HashSet<String> committed) throws IOException, 
HiveException {
+    for (FileStatus child : fs.listStatus(dir)) {
+      Path childPath = child.getPath();
+      if (unionSuffix == null) {
+        if (committed.remove(childPath.toString())) continue; // A good file.
+        deleteUncommitedFile(childPath, fs);
+      } else if (!child.isDirectory()) {
+        if (childPath.getName().endsWith(MANIFEST_EXTENSION)) continue;
+        if (committed.contains(childPath.toString())) {
+          throw new HiveException("Union FSOP has commited "
+              + childPath + " outside of union directory" + unionSuffix);
+        }
+        deleteUncommitedFile(childPath, fs);
+      } else if (childPath.getName().equals(unionSuffix)) {
+        // Found the right union directory; treat it as "our" MM directory.
+        cleanMmDirectory(childPath, fs, null, committed);
+      } else {
+        Utilities.LOG14535.info("FSOP for " + unionSuffix
+            + " is ignoring the other side of the union " + 
childPath.getName());
+      }
+    }
+  }
+
+  private static void deleteUncommitedFile(Path childPath, FileSystem fs)
+      throws IOException, HiveException {
+    Utilities.LOG14535.info("Deleting " + childPath + " that was not 
committed");
+    // We should actually succeed here - if we fail, don't commit the query.
+    if (!fs.delete(childPath, true)) {
+      throw new HiveException("Failed to delete an uncommitted path " + 
childPath);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 9a1c1fa..8da9a80 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1515,7 +1515,7 @@ public class Hive {
   }
 
 
-  private void commitMmTableWrite(Table tbl, Long mmWriteId)
+  public void commitMmTableWrite(Table tbl, Long mmWriteId)
       throws HiveException {
     try {
       getMSC().finalizeTableWrite(tbl.getDbName(), tbl.getTableName(), 
mmWriteId, true);

http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 79ef4d0..bd26854 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -1840,10 +1840,9 @@ public final class GenMapRedUtils {
 
         // Change all the linked file sink descriptors
         if (fileSinkDesc.isLinkedFileSink()) {
-          for (FileSinkDesc fsConf:fileSinkDesc.getLinkedFileSinkDesc()) {
-            fsConf.setParentDir(tmpDir);
+          for (FileSinkDesc fsConf : fileSinkDesc.getLinkedFileSinkDesc()) {
             fsConf.setDirName(new Path(tmpDir, fsConf.getDirName().getName()));
-            Utilities.LOG14535.info("createMoveTask setting tmpDir for 
LinkedFileSink chDir " + fsConf.getDirName() + "; new parent " + tmpDir + ", 
dest was " + fileSinkDesc.getDestPath());
+            Utilities.LOG14535.info("createMoveTask setting tmpDir for 
LinkedFileSink chDir " + fsConf.getDirName() + "; dest was " + 
fileSinkDesc.getDestPath());
           }
         } else {
           fileSinkDesc.setDirName(tmpDir);

http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java
index 7f7d192..3c37709 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java
@@ -223,7 +223,6 @@ public final class UnionProcFactory {
           FileSinkDesc fileSinkDesc = (FileSinkDesc) 
fileSinkOp.getConf().clone();
           fileSinkDesc.setDirName(new Path(parentDirName, 
parent.getIdentifier()));
           fileSinkDesc.setLinkedFileSink(true);
-          fileSinkDesc.setParentDir(parentDirName);
           Utilities.LOG14535.info("Created LinkedFileSink for union " + 
fileSinkDesc.getDirName() + "; parent " + parentDirName);
           parent.setChildOperators(null);
           Operator<? extends OperatorDesc> tmpFileSinkOp =

http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index 5c67fe2..e1da05c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -307,7 +307,6 @@ public class GenTezUtils {
         desc.setDirName(new Path(path, "" + linked.size()));
         Utilities.LOG14535.info("removing union - new desc with " + 
desc.getDirName() + "; parent " + path);
         desc.setLinkedFileSink(true);
-        desc.setParentDir(path);
         desc.setLinkedFileSinkDesc(linked);
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 905c000..62faf89 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -244,6 +244,7 @@ import com.google.common.collect.Sets;
 
 public class SemanticAnalyzer extends BaseSemanticAnalyzer {
 
+
   public static final String DUMMY_DATABASE = "_dummy_database";
   public static final String DUMMY_TABLE = "_dummy_table";
   public static final String SUBQUERY_TAG_1 = "-subquery1";
@@ -6532,7 +6533,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
     LoadTableDesc ltd = null;
     ListBucketingCtx lbCtx = null;
     Map<String, String> partSpec = null;
-    boolean isMmTable = false;
+    boolean isMmTable = false, isMmCtas = false;
     Long mmWriteId = null;
 
     switch (dest_type.intValue()) {
@@ -6676,26 +6677,6 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
     case QBMetaData.DEST_DFS_FILE: {
       dest_path = new Path(qbm.getDestFileForAlias(dest));
 
-      if (isLocal) {
-        // for local directory - we always write to map-red intermediate
-        // store and then copy to local fs
-        queryTmpdir = ctx.getMRTmpPath();
-      } else {
-        // otherwise write to the file system implied by the directory
-        // no copy is required. we may want to revisit this policy in future
-
-        try {
-          Path qPath = FileUtils.makeQualified(dest_path, conf);
-          queryTmpdir = ctx.getTempDirForPath(qPath);
-        } catch (Exception e) {
-          throw new SemanticException("Error creating temporary folder on: "
-              + dest_path, e);
-        }
-      }
-      String cols = "";
-      String colTypes = "";
-      ArrayList<ColumnInfo> colInfos = inputRR.getColumnInfos();
-
       // CTAS case: the file output format and serde are defined by the create
       // table command rather than taking the default value
       List<FieldSchema> field_schemas = null;
@@ -6705,64 +6686,39 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
         field_schemas = new ArrayList<FieldSchema>();
         destTableIsTemporary = tblDesc.isTemporary();
         destTableIsMaterialization = tblDesc.isMaterialization();
+        if (MetaStoreUtils.isMmTable(tblDesc.getTblProps())) {
+          isMmTable = isMmCtas = true;
+          // TODO# this should really get current ACID txn; assuming ACID 
works correctly the txn
+          //       should have been opened to create the ACID table. For now 
use the first ID.
+          mmWriteId = 0l;
+          tblDesc.setInitialWriteId(mmWriteId);
+        }
       } else if (viewDesc != null) {
         field_schemas = new ArrayList<FieldSchema>();
         destTableIsTemporary = false;
       }
 
-      boolean first = true;
-      for (ColumnInfo colInfo : colInfos) {
-        String[] nm = inputRR.reverseLookup(colInfo.getInternalName());
-
-        if (nm[1] != null) { // non-null column alias
-          colInfo.setAlias(nm[1]);
-        }
-
-        String colName = colInfo.getInternalName();  //default column name
-        if (field_schemas != null) {
-          FieldSchema col = new FieldSchema();
-          if (!("".equals(nm[0])) && nm[1] != null) {
-            colName = unescapeIdentifier(colInfo.getAlias()).toLowerCase(); // 
remove ``
-          }
-          colName = fixCtasColumnName(colName);
-          col.setName(colName);
-          String typeName = colInfo.getType().getTypeName();
-          // CTAS should NOT create a VOID type
-          if (typeName.equals(serdeConstants.VOID_TYPE_NAME)) {
-              throw new SemanticException(ErrorMsg.CTAS_CREATES_VOID_TYPE
-              .getMsg(colName));
-          }
-          col.setType(typeName);
-          field_schemas.add(col);
-        }
-
-        if (!first) {
-          cols = cols.concat(",");
-          colTypes = colTypes.concat(":");
-        }
-
-        first = false;
-        cols = cols.concat(colName);
-
-        // Replace VOID type with string when the output is a temp table or
-        // local files.
-        // A VOID type can be generated under the query:
-        //
-        // select NULL from tt;
-        // or
-        // insert overwrite local directory "abc" select NULL from tt;
-        //
-        // where there is no column type to which the NULL value should be
-        // converted.
-        //
-        String tName = colInfo.getType().getTypeName();
-        if (tName.equals(serdeConstants.VOID_TYPE_NAME)) {
-          colTypes = colTypes.concat(serdeConstants.STRING_TYPE_NAME);
-        } else {
-          colTypes = colTypes.concat(tName);
+      if (isLocal) {
+        assert !isMmTable;
+        // for local directory - we always write to map-red intermediate
+        // store and then copy to local fs
+        queryTmpdir = ctx.getMRTmpPath();
+      } else {
+        // otherwise write to the file system implied by the directory
+        // no copy is required. we may want to revisit this policy in future
+        try {
+          Path qPath = FileUtils.makeQualified(dest_path, conf);
+          queryTmpdir = isMmTable ? qPath : ctx.getTempDirForPath(qPath);
+          Utilities.LOG14535.info("Setting query directory " + queryTmpdir + " 
from " + dest_path + " (" + isMmTable + ")");
+        } catch (Exception e) { 
+          throw new SemanticException("Error creating temporary folder on: "
+              + dest_path, e);
         }
       }
 
+      ColsAndTypes ct = deriveFileSinkColTypes(inputRR, field_schemas);
+      String cols = ct.cols, colTypes = ct.colTypes;
+
       // update the create table descriptor with the resulting schema.
       if (tblDesc != null) {
         tblDesc.setCols(new ArrayList<FieldSchema>(field_schemas));
@@ -6779,8 +6735,9 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
       }
 
       boolean isDfsDir = (dest_type.intValue() == QBMetaData.DEST_DFS_FILE);
-      loadFileWork.add(new LoadFileDesc(tblDesc, viewDesc, queryTmpdir, 
dest_path, isDfsDir, cols,
-          colTypes));
+      // Create LFD even for MM CTAS - it's a no-op move, but it still seems 
to be uses for stats.
+      loadFileWork.add(new LoadFileDesc(tblDesc, viewDesc,
+          queryTmpdir, dest_path, isDfsDir, cols, colTypes));
 
       if (tblDesc == null) {
         if (viewDesc != null) {
@@ -6866,6 +6823,10 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
         dest_path, currentTableId, destTableIsAcid, destTableIsTemporary,
         destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS,
         canBeMerged, mmWriteId);
+    if (isMmCtas) {
+      // Add FSD so that the LoadTask compilation could fix up its path to 
avoid the move.
+      tableDesc.setWriter(fileSinkDesc);
+    }
 
     Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
         fileSinkDesc, fsRS, input), inputRR);
@@ -6897,6 +6858,64 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
     return output;
   }
 
+  private ColsAndTypes deriveFileSinkColTypes(
+      RowResolver inputRR, List<FieldSchema> field_schemas) throws 
SemanticException {
+    ColsAndTypes result = new ColsAndTypes("", "");
+    ArrayList<ColumnInfo> colInfos = inputRR.getColumnInfos();
+    boolean first = true;
+    for (ColumnInfo colInfo : colInfos) {
+      String[] nm = inputRR.reverseLookup(colInfo.getInternalName());
+
+      if (nm[1] != null) { // non-null column alias
+        colInfo.setAlias(nm[1]);
+      }
+
+      String colName = colInfo.getInternalName();  //default column name
+      if (field_schemas != null) {
+        FieldSchema col = new FieldSchema();
+        if (!("".equals(nm[0])) && nm[1] != null) {
+          colName = unescapeIdentifier(colInfo.getAlias()).toLowerCase(); // 
remove ``
+        }
+        colName = fixCtasColumnName(colName);
+        col.setName(colName);
+        String typeName = colInfo.getType().getTypeName();
+        // CTAS should NOT create a VOID type
+        if (typeName.equals(serdeConstants.VOID_TYPE_NAME)) {
+            throw new 
SemanticException(ErrorMsg.CTAS_CREATES_VOID_TYPE.getMsg(colName));
+        }
+        col.setType(typeName);
+        field_schemas.add(col);
+      }
+
+      if (!first) {
+        result.cols = result.cols.concat(",");
+        result.colTypes = result.colTypes.concat(":");
+      }
+
+      first = false;
+      result.cols = result.cols.concat(colName);
+
+      // Replace VOID type with string when the output is a temp table or
+      // local files.
+      // A VOID type can be generated under the query:
+      //
+      // select NULL from tt;
+      // or
+      // insert overwrite local directory "abc" select NULL from tt;
+      //
+      // where there is no column type to which the NULL value should be
+      // converted.
+      //
+      String tName = colInfo.getType().getTypeName();
+      if (tName.equals(serdeConstants.VOID_TYPE_NAME)) {
+        result.colTypes = 
result.colTypes.concat(serdeConstants.STRING_TYPE_NAME);
+      } else {
+        result.colTypes = result.colTypes.concat(tName);
+      }
+    }
+    return result;
+  }
+
   private static Long getMmWriteId(Table tbl, boolean isMmTable) throws 
HiveException {
     if (!isMmTable) return null;
     // Get the next write ID for this table. We will prefix files with this 
write ID.
@@ -10145,7 +10164,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
         if (partitions != null) {
           for (Partition partn : partitions) {
             // inputs.add(new ReadEntity(partn)); // is this needed at all?
-             LOG.info("XXX: adding part: "+partn);
+        LOG.info("XXX: adding part: "+partn);
             outputs.add(new WriteEntity(partn, 
WriteEntity.WriteType.DDL_NO_LOCK));
           }
         }
@@ -11809,7 +11828,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
         }
       }
 
-      if(location != null && location.length() != 0) {
+      if (location != null && location.length() != 0) {
         Path locPath = new Path(location);
         FileSystem curFs = null;
         FileStatus locStats = null;
@@ -11818,7 +11837,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
           if(curFs != null) {
             locStats = curFs.getFileStatus(locPath);
           }
-          if(locStats != null && locStats.isDir()) {
+          if (locStats != null && locStats.isDir()) {
             FileStatus[] lStats = curFs.listStatus(locPath);
             if(lStats != null && lStats.length != 0) {
               throw new 
SemanticException(ErrorMsg.CTAS_LOCATION_NONEMPTY.getMsg(location));
@@ -11835,14 +11854,13 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
       }
 
       tblProps = addDefaultProperties(tblProps);
-
       tableDesc = new CreateTableDesc(qualifiedTabName[0], dbDotTab, isExt, 
isTemporary, cols,
           partCols, bucketCols, sortCols, numBuckets, 
rowFormatParams.fieldDelim,
           rowFormatParams.fieldEscape, rowFormatParams.collItemDelim, 
rowFormatParams.mapKeyDelim,
           rowFormatParams.lineDelim, comment, storageFormat.getInputFormat(),
           storageFormat.getOutputFormat(), location, storageFormat.getSerde(),
           storageFormat.getStorageHandler(), storageFormat.getSerdeProps(), 
tblProps, ifNotExists,
-         skewedColNames, skewedValues, true, primaryKeys, foreignKeys);
+    skewedColNames, skewedValues, true, primaryKeys, foreignKeys);
       tableDesc.setMaterialization(isMaterialization);
       tableDesc.setStoredAsSubDirectories(storedAsDirs);
       tableDesc.setNullFormat(rowFormatParams.nullFormat);
@@ -13177,4 +13195,12 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
     this.loadFileWork = loadFileWork;
   }
 
+  private static final class ColsAndTypes {
+    public ColsAndTypes(String cols, String colTypes) {
+      this.cols = cols;
+      this.colTypes = colTypes;
+    }
+    public String cols;
+    public String colTypes;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index 363d41a..e177925 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -34,7 +34,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
+import org.apache.hadoop.hive.common.ValidWriteIds;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.ql.Context;
@@ -62,6 +64,7 @@ import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
 import org.apache.hadoop.hive.ql.plan.CreateViewDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
@@ -205,7 +208,7 @@ public abstract class TaskCompiler {
       }
     } else if (!isCStats) {
       for (LoadTableDesc ltd : loadTableWork) {
-        // TODO# move task is created here; handle MM tables
+        // TODO#  What is this path? special case for MM?
         Task<MoveWork> tsk = TaskFactory.get(new MoveWork(null, null, ltd, 
null, false), conf);
         mvTask.add(tsk);
         // Check to see if we are stale'ing any indexes and auto-update them 
if we want
@@ -224,45 +227,15 @@ public abstract class TaskCompiler {
         }
       }
 
-      boolean oneLoadFile = true;
+      boolean oneLoadFileForCtas = true;
       for (LoadFileDesc lfd : loadFileWork) {
         if (pCtx.getQueryProperties().isCTAS() || 
pCtx.getQueryProperties().isMaterializedView()) {
-          assert (oneLoadFile); // should not have more than 1 load file for
-          // CTAS
-          // make the movetask's destination directory the table's destination.
-          Path location;
-          String loc = pCtx.getQueryProperties().isCTAS() ?
-                  pCtx.getCreateTable().getLocation() : 
pCtx.getCreateViewDesc().getLocation();
-          if (loc == null) {
-            // get the default location
-            Path targetPath;
-            try {
-              String protoName = null;
-              if (pCtx.getQueryProperties().isCTAS()) {
-                protoName = pCtx.getCreateTable().getTableName();
-              } else if (pCtx.getQueryProperties().isMaterializedView()) {
-                protoName = pCtx.getCreateViewDesc().getViewName();
-              }
-              String[] names = Utilities.getDbTableName(protoName);
-              if (!db.databaseExists(names[0])) {
-                throw new SemanticException("ERROR: The database " + names[0]
-                    + " does not exist.");
-              }
-              Warehouse wh = new Warehouse(conf);
-              targetPath = wh.getTablePath(db.getDatabase(names[0]), names[1]);
-            } catch (HiveException e) {
-              throw new SemanticException(e);
-            } catch (MetaException e) {
-              throw new SemanticException(e);
-            }
-
-            location = targetPath;
-          } else {
-            location = new Path(loc);
+          if (!oneLoadFileForCtas) { // should not have more than 1 load file 
for CTAS.
+            throw new SemanticException(
+                "One query is not expected to contain multiple CTAS loads 
statements");
           }
-          lfd.setTargetDir(location);
-
-          oneLoadFile = false;
+          setLoadFileLocation(pCtx, lfd);
+          oneLoadFileForCtas = false;
         }
         mvTask.add(TaskFactory.get(new MoveWork(null, null, null, lfd, false), 
conf));
       }
@@ -288,28 +261,7 @@ public abstract class TaskCompiler {
      * a column stats task instead of a fetch task to persist stats to the 
metastore.
      */
     if (isCStats || !pCtx.getColumnStatsAutoGatherContexts().isEmpty()) {
-      Set<Task<? extends Serializable>> leafTasks = new LinkedHashSet<Task<? 
extends Serializable>>();
-      getLeafTasks(rootTasks, leafTasks);
-      if (isCStats) {
-        genColumnStatsTask(pCtx.getAnalyzeRewrite(), loadFileWork, leafTasks, 
outerQueryLimit, 0);
-      } else {
-        for (ColumnStatsAutoGatherContext columnStatsAutoGatherContext : pCtx
-            .getColumnStatsAutoGatherContexts()) {
-          if (!columnStatsAutoGatherContext.isInsertInto()) {
-            
genColumnStatsTask(columnStatsAutoGatherContext.getAnalyzeRewrite(),
-                columnStatsAutoGatherContext.getLoadFileWork(), leafTasks, 
outerQueryLimit, 0);
-          } else {
-            int numBitVector;
-            try {
-              numBitVector = 
HiveStatsUtils.getNumBitVectorsForNDVEstimation(conf);
-            } catch (Exception e) {
-              throw new SemanticException(e.getMessage());
-            }
-            
genColumnStatsTask(columnStatsAutoGatherContext.getAnalyzeRewrite(),
-                columnStatsAutoGatherContext.getLoadFileWork(), leafTasks, 
outerQueryLimit, numBitVector);
-          }
-        }
-      }
+      createColumnStatsTasks(pCtx, rootTasks, loadFileWork, isCStats, 
outerQueryLimit);
     }
 
     decideExecMode(rootTasks, ctx, globalLimitCtx);
@@ -356,6 +308,80 @@ public abstract class TaskCompiler {
     }
   }
 
+  private void setLoadFileLocation(
+      final ParseContext pCtx, LoadFileDesc lfd) throws SemanticException {
+    // CTAS; make the movetask's destination directory the table's destination.
+    Long mmWriteIdForCtas = null;
+    FileSinkDesc dataSinkForCtas = null;
+    String loc = null;
+    if (pCtx.getQueryProperties().isCTAS()) {
+      CreateTableDesc ctd = pCtx.getCreateTable();
+      dataSinkForCtas = ctd.getAndUnsetWriter();
+      mmWriteIdForCtas = ctd.getInitialWriteId();
+      loc = ctd.getLocation();
+    } else {
+      loc = pCtx.getCreateViewDesc().getLocation();
+    }
+    Path location = (loc == null) ? getDefaultCtasLocation(pCtx) : new 
Path(loc);
+    if (mmWriteIdForCtas != null) {
+      dataSinkForCtas.setDirName(location);
+      location = new Path(location, 
ValidWriteIds.getMmFilePrefix(mmWriteIdForCtas));
+      lfd.setSourcePath(location);
+      Utilities.LOG14535.info("Setting MM CTAS to  " + location);
+    }
+    Utilities.LOG14535.info("Location for LFD is being set to " + location + 
"; moving from " + lfd.getSourcePath());
+    lfd.setTargetDir(location);
+  }
+
+  private void createColumnStatsTasks(final ParseContext pCtx,
+      final List<Task<? extends Serializable>> rootTasks,
+      List<LoadFileDesc> loadFileWork, boolean isCStats, int outerQueryLimit)
+      throws SemanticException {
+    Set<Task<? extends Serializable>> leafTasks = new LinkedHashSet<Task<? 
extends Serializable>>();
+    getLeafTasks(rootTasks, leafTasks);
+    if (isCStats) {
+      genColumnStatsTask(pCtx.getAnalyzeRewrite(), loadFileWork, leafTasks, 
outerQueryLimit, 0);
+    } else {
+      for (ColumnStatsAutoGatherContext columnStatsAutoGatherContext : pCtx
+          .getColumnStatsAutoGatherContexts()) {
+        if (!columnStatsAutoGatherContext.isInsertInto()) {
+          genColumnStatsTask(columnStatsAutoGatherContext.getAnalyzeRewrite(),
+              columnStatsAutoGatherContext.getLoadFileWork(), leafTasks, 
outerQueryLimit, 0);
+        } else {
+          int numBitVector;
+          try {
+            numBitVector = 
HiveStatsUtils.getNumBitVectorsForNDVEstimation(conf);
+          } catch (Exception e) {
+            throw new SemanticException(e.getMessage());
+          }
+          genColumnStatsTask(columnStatsAutoGatherContext.getAnalyzeRewrite(),
+              columnStatsAutoGatherContext.getLoadFileWork(), leafTasks, 
outerQueryLimit, numBitVector);
+        }
+      }
+    }
+  }
+
+  private Path getDefaultCtasLocation(final ParseContext pCtx) throws 
SemanticException {
+    try {
+      String protoName = null;
+      if (pCtx.getQueryProperties().isCTAS()) {
+        protoName = pCtx.getCreateTable().getTableName();
+      } else if (pCtx.getQueryProperties().isMaterializedView()) {
+        protoName = pCtx.getCreateViewDesc().getViewName();
+      }
+      String[] names = Utilities.getDbTableName(protoName);
+      if (!db.databaseExists(names[0])) {
+        throw new SemanticException("ERROR: The database " + names[0] + " does 
not exist.");
+      }
+      Warehouse wh = new Warehouse(conf);
+      return wh.getTablePath(db.getDatabase(names[0]), names[1]);
+    } catch (HiveException e) {
+      throw new SemanticException(e);
+    } catch (MetaException e) {
+      throw new SemanticException(e);
+    }
+  }
+
   private void patchUpAfterCTASorMaterializedView(final List<Task<? extends 
Serializable>>  rootTasks,
                                                   final HashSet<WriteEntity> 
outputs,
                                                   Task<? extends Serializable> 
createTask) {

http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
index eafba21..7609068 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
@@ -94,6 +94,10 @@ public class CreateTableDesc extends DDLDesc implements 
Serializable {
   private boolean isCTAS = false;
   List<SQLPrimaryKey> primaryKeys;
   List<SQLForeignKey> foreignKeys;
+  private Long initialWriteId;
+  // The FSOP configuration for the FSOP that is going to write initial data 
during ctas.
+  // This is not needed beyond compilation, so it is transient.
+  private transient FileSinkDesc writer;
 
   public CreateTableDesc() {
   }
@@ -825,5 +829,23 @@ public class CreateTableDesc extends DDLDesc implements 
Serializable {
     return tbl;
   }
 
+  public void setInitialWriteId(Long mmWriteId) {
+    this.initialWriteId = mmWriteId;
+  }
+
+  public Long getInitialWriteId() {
+    return initialWriteId;
+  }
+
+  public FileSinkDesc getAndUnsetWriter() {
+    FileSinkDesc fsd = writer;
+    writer = null;
+    return fsd;
+  }
+
+  public void setWriter(FileSinkDesc writer) {
+    this.writer = writer;
+  }
+
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index 8bef7a9..d7d6e38 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -82,7 +82,6 @@ public class FileSinkDesc extends AbstractOperatorDesc {
   // the sub-queries write to sub-directories of a common directory. So, the 
file sink
   // descriptors for subq1 and subq2 are linked.
   private boolean linkedFileSink = false;
-  private Path parentDir;
   transient private List<FileSinkDesc> linkedFileSinkDesc;
 
   private boolean statsReliable;
@@ -152,7 +151,6 @@ public class FileSinkDesc extends AbstractOperatorDesc {
     ret.setStaticSpec(staticSpec);
     ret.setStatsAggPrefix(statsKeyPref);
     ret.setLinkedFileSink(linkedFileSink);
-    ret.setParentDir(parentDir);
     ret.setLinkedFileSinkDesc(linkedFileSinkDesc);
     ret.setStatsReliable(statsReliable);
     ret.setDpSortState(dpSortState);
@@ -180,7 +178,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
   }
 
   public Path getFinalDirName() {
-    return linkedFileSink ? parentDir : dirName;
+    return linkedFileSink ? dirName.getParent() : dirName;
   }
 
   /** getFinalDirName that takes into account MM, but not DP, LB or buckets. */
@@ -395,11 +393,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
   }
 
   public Path getParentDir() {
-    return parentDir;
-  }
-
-  public void setParentDir(Path parentDir) {
-    this.parentDir = parentDir;
+    return dirName.getParent();
   }
 
   public boolean isStatsReliable() {

http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java
index bcd3125..d46f71e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java
@@ -42,5 +42,8 @@ public class LoadDesc implements Serializable {
   public Path getSourcePath() {
     return sourcePath;
   }
-  
+
+  public void setSourcePath(Path path) {
+    this.sourcePath = path;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
index 064a864..7670ef2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
@@ -60,7 +60,7 @@ public class LoadFileDesc extends LoadDesc implements 
Serializable {
       final boolean isDfsDir, final String columns, final String columnTypes) {
 
     super(sourcePath);
-    Utilities.LOG14535.info("creating LFD from " + sourcePath + " to " + 
targetDir/*, new Exception()*/);
+    Utilities.LOG14535.info("creating LFD from " + sourcePath + " to " + 
targetDir, new Exception());
     this.targetDir = targetDir;
     this.isDfsDir = isDfsDir;
     this.columns = columns;

http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
index f0b2775..3ada134 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
@@ -70,7 +70,7 @@ public class MoveWork implements Serializable {
       boolean checkFileFormat, boolean srcLocal) {
     this(inputs, outputs);
     Utilities.LOG14535.info("Creating MoveWork " + 
System.identityHashCode(this)
-        + " with " + loadTableWork + "; " + loadFileWork);
+        + " with " + loadTableWork + "; " + loadFileWork, new Exception());
     this.loadTableWork = loadTableWork;
     this.loadFileWork = loadFileWork;
     this.checkFileFormat = checkFileFormat;

http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/test/queries/clientpositive/mm_all.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mm_all.q 
b/ql/src/test/queries/clientpositive/mm_all.q
index 8ce42a2..8163d2f 100644
--- a/ql/src/test/queries/clientpositive/mm_all.q
+++ b/ql/src/test/queries/clientpositive/mm_all.q
@@ -162,30 +162,22 @@ select * from merge1_mm;
 
 drop table merge1_mm;
 
+-- TODO: need to include merge+union+DP, but it's broken for now
 
--- TODO: need to include merge+union, but it's broken for now
 
+drop table ctas0_mm;
+create table ctas0_mm tblproperties ('hivecommit'='true') as select * from 
intermediate;
+select * from ctas0_mm;
+drop table ctas0_mm;
 
+drop table ctas1_mm;
+create table ctas1_mm tblproperties ('hivecommit'='true') as
+  select * from intermediate union all select * from intermediate;
+select * from ctas1_mm;
+drop table ctas1_mm;
 
 
 
-
--- future
-
-
-
-
-
-
-
---drop table ctas_mm;
---
---
---create table ctas_mm tblproperties ('hivecommit'='true') as select * from 
src limit 3;
---
---
----- TODO load, multi-insert etc
---
---
+-- TODO load, multi-insert, buckets
 
 drop table intermediate;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/test/queries/clientpositive/mm_current.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mm_current.q 
b/ql/src/test/queries/clientpositive/mm_current.q
index f423b00..f2d353f 100644
--- a/ql/src/test/queries/clientpositive/mm_current.q
+++ b/ql/src/test/queries/clientpositive/mm_current.q
@@ -12,15 +12,13 @@ insert into table intermediate partition(p='455') select 
distinct key from src w
 insert into table intermediate partition(p='456') select distinct key from src 
where key is not null order by key asc limit 2;
 
 
-set hive.merge.orcfile.stripe.level=true;
-set hive.merge.tezfiles=true;
-set hive.merge.mapfiles=true;
-set hive.merge.mapredfiles=true;
-
-
 
+create table ctas1_mm tblproperties ('hivecommit'='true') as
+  select * from intermediate union all select * from intermediate;
 
 
+select * from ctas1_mm;
+drop table ctas1_mm;
 
 drop table intermediate;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/test/results/clientpositive/llap/mm_all.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/mm_all.q.out 
b/ql/src/test/results/clientpositive/llap/mm_all.q.out
index f8001c2..93716de 100644
--- a/ql/src/test/results/clientpositive/llap/mm_all.q.out
+++ b/ql/src/test/results/clientpositive/llap/mm_all.q.out
@@ -154,11 +154,11 @@ POSTHOOK: Input: default@part_mm@key_mm=456
 10     455
 10     455
 97     455
-97     456
 97     455
-98     455
+97     456
 98     456
 98     455
+98     455
 PREHOOK: query: drop table part_mm
 PREHOOK: type: DROPTABLE
 PREHOOK: Input: default@part_mm
@@ -903,59 +903,105 @@ POSTHOOK: query: drop table merge1_mm
 POSTHOOK: type: DROPTABLE
 POSTHOOK: Input: default@merge1_mm
 POSTHOOK: Output: default@merge1_mm
-PREHOOK: query: -- TODO: need to include merge+union, but it's broken for now
-
-
-
-
-
-
--- future
-
-
-
+PREHOOK: query: -- TODO: need to include merge+union+DP, but it's broken for 
now
 
 
+drop table ctas0_mm
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: -- TODO: need to include merge+union+DP, but it's broken for 
now
 
 
---drop table ctas_mm;
---
---
---create table ctas_mm tblproperties ('hivecommit'='true') as select * from 
src limit 3;
---
---
----- TODO load, multi-insert etc
---
---
+drop table ctas0_mm
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table ctas0_mm tblproperties ('hivecommit'='true') as 
select * from intermediate
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Output: database:default
+PREHOOK: Output: default@ctas0_mm
+POSTHOOK: query: create table ctas0_mm tblproperties ('hivecommit'='true') as 
select * from intermediate
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@ctas0_mm
+POSTHOOK: Lineage: ctas0_mm.key SIMPLE 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+POSTHOOK: Lineage: ctas0_mm.p SIMPLE 
[(intermediate)intermediate.FieldSchema(name:p, type:int, comment:null), ]
+PREHOOK: query: select * from ctas0_mm
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ctas0_mm
+#### A masked pattern was here ####
+POSTHOOK: query: select * from ctas0_mm
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ctas0_mm
+#### A masked pattern was here ####
+98     455
+97     455
+0      456
+10     456
+PREHOOK: query: drop table ctas0_mm
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@ctas0_mm
+PREHOOK: Output: default@ctas0_mm
+POSTHOOK: query: drop table ctas0_mm
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@ctas0_mm
+POSTHOOK: Output: default@ctas0_mm
+PREHOOK: query: drop table ctas1_mm
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table ctas1_mm
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table ctas1_mm tblproperties ('hivecommit'='true') as
+  select * from intermediate union all select * from intermediate
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Output: database:default
+PREHOOK: Output: default@ctas1_mm
+POSTHOOK: query: create table ctas1_mm tblproperties ('hivecommit'='true') as
+  select * from intermediate union all select * from intermediate
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@ctas1_mm
+POSTHOOK: Lineage: ctas1_mm.key EXPRESSION 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+POSTHOOK: Lineage: ctas1_mm.p EXPRESSION 
[(intermediate)intermediate.FieldSchema(name:p, type:int, comment:null), ]
+PREHOOK: query: select * from ctas1_mm
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ctas1_mm
+#### A masked pattern was here ####
+POSTHOOK: query: select * from ctas1_mm
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ctas1_mm
+#### A masked pattern was here ####
+98     455
+97     455
+0      456
+10     456
+98     455
+97     455
+0      456
+10     456
+PREHOOK: query: drop table ctas1_mm
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@ctas1_mm
+PREHOOK: Output: default@ctas1_mm
+POSTHOOK: query: drop table ctas1_mm
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@ctas1_mm
+POSTHOOK: Output: default@ctas1_mm
+PREHOOK: query: -- TODO load, multi-insert, buckets
 
 drop table intermediate
 PREHOOK: type: DROPTABLE
 PREHOOK: Input: default@intermediate
 PREHOOK: Output: default@intermediate
-POSTHOOK: query: -- TODO: need to include merge+union, but it's broken for now
-
-
-
-
-
-
--- future
-
-
-
-
-
-
-
---drop table ctas_mm;
---
---
---create table ctas_mm tblproperties ('hivecommit'='true') as select * from 
src limit 3;
---
---
----- TODO load, multi-insert etc
---
---
+POSTHOOK: query: -- TODO load, multi-insert, buckets
 
 drop table intermediate
 POSTHOOK: type: DROPTABLE

http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/test/results/clientpositive/llap/mm_current.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/mm_current.q.out 
b/ql/src/test/results/clientpositive/llap/mm_current.q.out
index 4d28c63..5b51fa3 100644
--- a/ql/src/test/results/clientpositive/llap/mm_current.q.out
+++ b/ql/src/test/results/clientpositive/llap/mm_current.q.out
@@ -28,6 +28,48 @@ POSTHOOK: type: QUERY
 POSTHOOK: Input: default@src
 POSTHOOK: Output: default@intermediate@p=456
 POSTHOOK: Lineage: intermediate PARTITION(p=456).key EXPRESSION 
[(src)src.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: create table ctas1_mm tblproperties ('hivecommit'='true') as
+  select * from intermediate union all select * from intermediate
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Output: database:default
+PREHOOK: Output: default@ctas1_mm
+POSTHOOK: query: create table ctas1_mm tblproperties ('hivecommit'='true') as
+  select * from intermediate union all select * from intermediate
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@ctas1_mm
+POSTHOOK: Lineage: ctas1_mm.key EXPRESSION 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+POSTHOOK: Lineage: ctas1_mm.p EXPRESSION 
[(intermediate)intermediate.FieldSchema(name:p, type:int, comment:null), ]
+PREHOOK: query: select * from ctas1_mm
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ctas1_mm
+#### A masked pattern was here ####
+POSTHOOK: query: select * from ctas1_mm
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ctas1_mm
+#### A masked pattern was here ####
+98     455
+97     455
+0      456
+10     456
+98     455
+97     455
+0      456
+10     456
+PREHOOK: query: drop table ctas1_mm
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@ctas1_mm
+PREHOOK: Output: default@ctas1_mm
+POSTHOOK: query: drop table ctas1_mm
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@ctas1_mm
+POSTHOOK: Output: default@ctas1_mm
 PREHOOK: query: drop table intermediate
 PREHOOK: type: DROPTABLE
 PREHOOK: Input: default@intermediate

Reply via email to