HIVE-14643 : handle ctas for the MM tables (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/af4ff378 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/af4ff378 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/af4ff378 Branch: refs/heads/hive-14535 Commit: af4ff3787d648a9f4c80b5446d6bcd80b1efc69e Parents: 2474f06 Author: Sergey Shelukhin <ser...@apache.org> Authored: Mon Oct 17 12:33:31 2016 -0700 Committer: Sergey Shelukhin <ser...@apache.org> Committed: Mon Oct 17 12:33:31 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 12 +- .../hadoop/hive/ql/exec/FileSinkOperator.java | 52 ++++-- .../apache/hadoop/hive/ql/exec/Utilities.java | 74 +++++--- .../apache/hadoop/hive/ql/metadata/Hive.java | 2 +- .../hive/ql/optimizer/GenMapRedUtils.java | 5 +- .../optimizer/unionproc/UnionProcFactory.java | 1 - .../hadoop/hive/ql/parse/GenTezUtils.java | 1 - .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 182 +++++++++++-------- .../hadoop/hive/ql/parse/TaskCompiler.java | 144 +++++++++------ .../hadoop/hive/ql/plan/CreateTableDesc.java | 22 +++ .../hadoop/hive/ql/plan/FileSinkDesc.java | 10 +- .../apache/hadoop/hive/ql/plan/LoadDesc.java | 5 +- .../hadoop/hive/ql/plan/LoadFileDesc.java | 2 +- .../apache/hadoop/hive/ql/plan/MoveWork.java | 2 +- ql/src/test/queries/clientpositive/mm_all.q | 30 ++- ql/src/test/queries/clientpositive/mm_current.q | 10 +- .../results/clientpositive/llap/mm_all.q.out | 138 +++++++++----- .../clientpositive/llap/mm_current.q.out | 42 +++++ 18 files changed, 463 insertions(+), 271 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index acf570f..bb9eaf5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4059,8 +4059,18 @@ public class DDLTask extends Task<DDLWork> implements Serializable { } else { db.createTable(tbl, crtTbl.getIfNotExists()); } - if ( crtTbl.isCTAS()) { + if (crtTbl.isCTAS()) { Table createdTable = db.getTable(tbl.getDbName(), tbl.getTableName()); + if (crtTbl.getInitialWriteId() != null) { + // TODO# this would be retrieved via ACID before the query runs; for now we rely on it + // being zero at start; we can't create a write ID before we create the table here. + long initialWriteId = db.getNextTableWriteId(tbl.getDbName(), tbl.getTableName()); + if (initialWriteId != crtTbl.getInitialWriteId()) { + throw new HiveException("Initial write ID mismatch - expected " + + crtTbl.getInitialWriteId() + " but got " + initialWriteId); + } + db.commitMmTableWrite(tbl, initialWriteId); + } DataContainer dc = new DataContainer(createdTable.getTTable()); SessionState.get().getLineageState().setLineage( createdTable.getPath(), dc, createdTable.getCols() http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index dda4b51..ef6473a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -112,7 +112,8 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements protected transient Path parent; protected transient HiveOutputFormat<?, ?> hiveOutputFormat; protected transient Path specPath; - protected transient String childSpecPathDynLinkedPartitions; + protected transient String unionPath; + protected transient boolean isUnionDp; protected transient int dpStartCol; // start column # for DP columns protected transient List<String> dpVals; // array of values corresponding to DP columns protected transient List<Object> dpWritables; @@ -304,7 +305,12 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements } outPaths[filesIdx] = getTaskOutPath(taskId); } else { - String subdirPath = ValidWriteIds.getMmFilePrefix(conf.getMmWriteId()) + "/" + taskId; + String subdirPath = ValidWriteIds.getMmFilePrefix(conf.getMmWriteId()); + if (unionPath != null) { + // Create the union directory inside the MM directory. + subdirPath += Path.SEPARATOR + unionPath; + } + subdirPath += Path.SEPARATOR + taskId; if (!bDynParts && !isSkewedStoredAsSubDirectories) { finalPaths[filesIdx] = getFinalPath(subdirPath, specPath, extension); } else { @@ -369,7 +375,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements protected boolean filesCreated = false; private void initializeSpecPath() { - // TODO# special case #N // For a query of the type: // insert overwrite table T1 // select * from (subq1 union all subq2)u; @@ -383,18 +388,25 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements // and Parent/DynamicPartition/Child_1 respectively. // The movetask that follows subQ1 and subQ2 tasks still moves the directory // 'Parent' - if ((!conf.isLinkedFileSink()) || (dpCtx == null)) { + boolean isLinked = conf.isLinkedFileSink(); + if (!isLinked) { + // Simple case - no union. specPath = conf.getDirName(); - Utilities.LOG14535.info("Setting up FSOP " + System.identityHashCode(this) + " (" - + conf.isLinkedFileSink() + ") with " + taskId + " and " + specPath); - childSpecPathDynLinkedPartitions = null; - return; + unionPath = null; + } else { + isUnionDp = (dpCtx != null); + if (conf.isMmTable() || isUnionDp) { + // MM tables need custom handling for union suffix; DP tables use parent too. + specPath = conf.getParentDir(); + unionPath = conf.getDirName().getName(); + } else { + // For now, keep the old logic for non-MM non-DP union case. Should probably be unified. + specPath = conf.getDirName(); + unionPath = null; + } } - - specPath = conf.getParentDir(); - childSpecPathDynLinkedPartitions = conf.getDirName().getName(); Utilities.LOG14535.info("Setting up FSOP " + System.identityHashCode(this) + " (" - + conf.isLinkedFileSink() + ") with " + taskId + " and " + specPath); + + conf.isLinkedFileSink() + ") with " + taskId + " and " + specPath + " + " + unionPath); } /** Kryo ctor. */ @@ -903,9 +915,9 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements * @throws HiveException */ private FSPaths createNewPaths(String dirName) throws HiveException { - FSPaths fsp2 = new FSPaths(specPath, conf.isMmTable()); // TODO# this will break - fsp2.configureDynPartPath(dirName, childSpecPathDynLinkedPartitions); - Utilities.LOG14535.info("creating new paths for " + dirName + ", childSpec " + childSpecPathDynLinkedPartitions + FSPaths fsp2 = new FSPaths(specPath, conf.isMmTable()); + fsp2.configureDynPartPath(dirName, !conf.isMmTable() && isUnionDp ? unionPath : null); + Utilities.LOG14535.info("creating new paths for " + dirName + ", childSpec " + unionPath + ": tmpPath " + fsp2.getTmpPath() + ", task path " + fsp2.getTaskOutputTempPath()/*, new Exception()*/); if(!conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) { createBucketFiles(fsp2); @@ -1129,8 +1141,8 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements } } if (conf.getMmWriteId() != null) { - Utilities.writeMmCommitManifest(commitPaths, specPath, fs, taskId, conf.getMmWriteId(), - childSpecPathDynLinkedPartitions); + Utilities.writeMmCommitManifest( + commitPaths, specPath, fs, taskId, conf.getMmWriteId(), unionPath); } // Only publish stats if this operator's flag was set to gather stats if (conf.isGatherStats()) { @@ -1170,16 +1182,16 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements String unionSuffix = null; DynamicPartitionCtx dpCtx = conf.getDynPartCtx(); ListBucketingCtx lbCtx = conf.getLbCtx(); - if (conf.isLinkedFileSink() && (dpCtx != null)) { + if (conf.isLinkedFileSink() && (dpCtx != null || conf.isMmTable())) { specPath = conf.getParentDir(); - Utilities.LOG14535.info("Setting specPath to " + specPath + " for dynparts"); unionSuffix = conf.getDirName().getName(); } + Utilities.LOG14535.info("jobCloseOp using specPath " + specPath); if (!conf.isMmTable()) { Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf, reporter); } else { int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(), - lbLevels = lbCtx.calculateListBucketingLevel(); + lbLevels = lbCtx == null ? 0 : lbCtx.calculateListBucketingLevel(); // TODO: why is it stored in both? int numBuckets = (conf.getTable() != null) ? conf.getTable().getNumBuckets() : (dpCtx != null ? dpCtx.getNumBuckets() : 0); http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 49bdd84..accb237 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -3773,26 +3773,23 @@ public final class Utilities { } } - private static FileStatus[] getMmDirectoryCandidates(FileSystem fs, Path path, - int dpLevels, int lbLevels, String unionSuffix, PathFilter filter) throws IOException { + private static FileStatus[] getMmDirectoryCandidates(FileSystem fs, Path path, int dpLevels, + int lbLevels, PathFilter filter, long mmWriteId) throws IOException { StringBuilder sb = new StringBuilder(path.toUri().getPath()); for (int i = 0; i < dpLevels + lbLevels; i++) { sb.append(Path.SEPARATOR).append("*"); } - if (unionSuffix != null) { - sb.append(Path.SEPARATOR).append(unionSuffix); - } - sb.append(Path.SEPARATOR).append("*"); // TODO: we could add exact mm prefix here + sb.append(Path.SEPARATOR).append(ValidWriteIds.getMmFilePrefix(mmWriteId)); Utilities.LOG14535.info("Looking for files via: " + sb.toString()); Path pathPattern = new Path(path, sb.toString()); return fs.globStatus(pathPattern, filter); } private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manifestDir, - int dpLevels, int lbLevels, String unionSuffix, ValidWriteIds.IdPathFilter filter) - throws IOException { + int dpLevels, int lbLevels, String unionSuffix, ValidWriteIds.IdPathFilter filter, + long mmWriteId) throws IOException { FileStatus[] files = getMmDirectoryCandidates( - fs, specPath, dpLevels, lbLevels, unionSuffix, filter); + fs, specPath, dpLevels, lbLevels, filter, mmWriteId); if (files != null) { for (FileStatus status : files) { Utilities.LOG14535.info("Deleting " + status.getPath() + " on failure"); @@ -3854,7 +3851,8 @@ public final class Utilities { ValidWriteIds.IdPathFilter filter = new ValidWriteIds.IdPathFilter(mmWriteId, true); if (!success) { - tryDeleteAllMmFiles(fs, specPath, manifestDir, dpLevels, lbLevels, unionSuffix, filter); + tryDeleteAllMmFiles(fs, specPath, manifestDir, dpLevels, lbLevels, + unionSuffix, filter, mmWriteId); return; } FileStatus[] files = HiveStatsUtils.getFileStatusRecurse(manifestDir, 1, fs, filter); @@ -3871,8 +3869,9 @@ public final class Utilities { } Utilities.LOG14535.info("Looking for files in: " + specPath); - files = getMmDirectoryCandidates(fs, specPath, dpLevels, lbLevels, unionSuffix, filter); - ArrayList<FileStatus> results = new ArrayList<>(); + files = getMmDirectoryCandidates( + fs, specPath, dpLevels, lbLevels, filter, mmWriteId); + ArrayList<FileStatus> mmDirectories = new ArrayList<>(); if (files != null) { for (FileStatus status : files) { Path path = status.getPath(); @@ -3883,7 +3882,7 @@ public final class Utilities { tryDelete(fs, path); } } else { - results.add(status); + mmDirectories.add(status); } } } @@ -3901,16 +3900,8 @@ public final class Utilities { } } - for (FileStatus status : results) { - for (FileStatus child : fs.listStatus(status.getPath())) { - Path childPath = child.getPath(); - if (committed.remove(childPath.toString())) continue; // A good file. - Utilities.LOG14535.info("Deleting " + childPath + " that was not committed"); - // We should actually succeed here - if we fail, don't commit the query. - if (!fs.delete(childPath, true)) { - throw new HiveException("Failed to delete an uncommitted path " + childPath); - } - } + for (FileStatus status : mmDirectories) { + cleanMmDirectory(status.getPath(), fs, unionSuffix, committed); } if (!committed.isEmpty()) { @@ -3930,12 +3921,12 @@ public final class Utilities { } } - if (results.isEmpty()) return; + if (mmDirectories.isEmpty()) return; // TODO: see HIVE-14886 - removeTempOrDuplicateFiles is broken for list bucketing, // so maintain parity here by not calling it at all. if (lbLevels != 0) return; - FileStatus[] finalResults = results.toArray(new FileStatus[results.size()]); + FileStatus[] finalResults = mmDirectories.toArray(new FileStatus[mmDirectories.size()]); List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles( fs, finalResults, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf); // create empty buckets if necessary @@ -3945,4 +3936,37 @@ public final class Utilities { } } + private static void cleanMmDirectory(Path dir, FileSystem fs, + String unionSuffix, HashSet<String> committed) throws IOException, HiveException { + for (FileStatus child : fs.listStatus(dir)) { + Path childPath = child.getPath(); + if (unionSuffix == null) { + if (committed.remove(childPath.toString())) continue; // A good file. + deleteUncommitedFile(childPath, fs); + } else if (!child.isDirectory()) { + if (childPath.getName().endsWith(MANIFEST_EXTENSION)) continue; + if (committed.contains(childPath.toString())) { + throw new HiveException("Union FSOP has commited " + + childPath + " outside of union directory" + unionSuffix); + } + deleteUncommitedFile(childPath, fs); + } else if (childPath.getName().equals(unionSuffix)) { + // Found the right union directory; treat it as "our" MM directory. + cleanMmDirectory(childPath, fs, null, committed); + } else { + Utilities.LOG14535.info("FSOP for " + unionSuffix + + " is ignoring the other side of the union " + childPath.getName()); + } + } + } + + private static void deleteUncommitedFile(Path childPath, FileSystem fs) + throws IOException, HiveException { + Utilities.LOG14535.info("Deleting " + childPath + " that was not committed"); + // We should actually succeed here - if we fail, don't commit the query. + if (!fs.delete(childPath, true)) { + throw new HiveException("Failed to delete an uncommitted path " + childPath); + } + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 9a1c1fa..8da9a80 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1515,7 +1515,7 @@ public class Hive { } - private void commitMmTableWrite(Table tbl, Long mmWriteId) + public void commitMmTableWrite(Table tbl, Long mmWriteId) throws HiveException { try { getMSC().finalizeTableWrite(tbl.getDbName(), tbl.getTableName(), mmWriteId, true); http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 79ef4d0..bd26854 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1840,10 +1840,9 @@ public final class GenMapRedUtils { // Change all the linked file sink descriptors if (fileSinkDesc.isLinkedFileSink()) { - for (FileSinkDesc fsConf:fileSinkDesc.getLinkedFileSinkDesc()) { - fsConf.setParentDir(tmpDir); + for (FileSinkDesc fsConf : fileSinkDesc.getLinkedFileSinkDesc()) { fsConf.setDirName(new Path(tmpDir, fsConf.getDirName().getName())); - Utilities.LOG14535.info("createMoveTask setting tmpDir for LinkedFileSink chDir " + fsConf.getDirName() + "; new parent " + tmpDir + ", dest was " + fileSinkDesc.getDestPath()); + Utilities.LOG14535.info("createMoveTask setting tmpDir for LinkedFileSink chDir " + fsConf.getDirName() + "; dest was " + fileSinkDesc.getDestPath()); } } else { fileSinkDesc.setDirName(tmpDir); http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java index 7f7d192..3c37709 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java @@ -223,7 +223,6 @@ public final class UnionProcFactory { FileSinkDesc fileSinkDesc = (FileSinkDesc) fileSinkOp.getConf().clone(); fileSinkDesc.setDirName(new Path(parentDirName, parent.getIdentifier())); fileSinkDesc.setLinkedFileSink(true); - fileSinkDesc.setParentDir(parentDirName); Utilities.LOG14535.info("Created LinkedFileSink for union " + fileSinkDesc.getDirName() + "; parent " + parentDirName); parent.setChildOperators(null); Operator<? extends OperatorDesc> tmpFileSinkOp = http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index 5c67fe2..e1da05c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -307,7 +307,6 @@ public class GenTezUtils { desc.setDirName(new Path(path, "" + linked.size())); Utilities.LOG14535.info("removing union - new desc with " + desc.getDirName() + "; parent " + path); desc.setLinkedFileSink(true); - desc.setParentDir(path); desc.setLinkedFileSinkDesc(linked); } http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 905c000..62faf89 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -244,6 +244,7 @@ import com.google.common.collect.Sets; public class SemanticAnalyzer extends BaseSemanticAnalyzer { + public static final String DUMMY_DATABASE = "_dummy_database"; public static final String DUMMY_TABLE = "_dummy_table"; public static final String SUBQUERY_TAG_1 = "-subquery1"; @@ -6532,7 +6533,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { LoadTableDesc ltd = null; ListBucketingCtx lbCtx = null; Map<String, String> partSpec = null; - boolean isMmTable = false; + boolean isMmTable = false, isMmCtas = false; Long mmWriteId = null; switch (dest_type.intValue()) { @@ -6676,26 +6677,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { case QBMetaData.DEST_DFS_FILE: { dest_path = new Path(qbm.getDestFileForAlias(dest)); - if (isLocal) { - // for local directory - we always write to map-red intermediate - // store and then copy to local fs - queryTmpdir = ctx.getMRTmpPath(); - } else { - // otherwise write to the file system implied by the directory - // no copy is required. we may want to revisit this policy in future - - try { - Path qPath = FileUtils.makeQualified(dest_path, conf); - queryTmpdir = ctx.getTempDirForPath(qPath); - } catch (Exception e) { - throw new SemanticException("Error creating temporary folder on: " - + dest_path, e); - } - } - String cols = ""; - String colTypes = ""; - ArrayList<ColumnInfo> colInfos = inputRR.getColumnInfos(); - // CTAS case: the file output format and serde are defined by the create // table command rather than taking the default value List<FieldSchema> field_schemas = null; @@ -6705,64 +6686,39 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { field_schemas = new ArrayList<FieldSchema>(); destTableIsTemporary = tblDesc.isTemporary(); destTableIsMaterialization = tblDesc.isMaterialization(); + if (MetaStoreUtils.isMmTable(tblDesc.getTblProps())) { + isMmTable = isMmCtas = true; + // TODO# this should really get current ACID txn; assuming ACID works correctly the txn + // should have been opened to create the ACID table. For now use the first ID. + mmWriteId = 0l; + tblDesc.setInitialWriteId(mmWriteId); + } } else if (viewDesc != null) { field_schemas = new ArrayList<FieldSchema>(); destTableIsTemporary = false; } - boolean first = true; - for (ColumnInfo colInfo : colInfos) { - String[] nm = inputRR.reverseLookup(colInfo.getInternalName()); - - if (nm[1] != null) { // non-null column alias - colInfo.setAlias(nm[1]); - } - - String colName = colInfo.getInternalName(); //default column name - if (field_schemas != null) { - FieldSchema col = new FieldSchema(); - if (!("".equals(nm[0])) && nm[1] != null) { - colName = unescapeIdentifier(colInfo.getAlias()).toLowerCase(); // remove `` - } - colName = fixCtasColumnName(colName); - col.setName(colName); - String typeName = colInfo.getType().getTypeName(); - // CTAS should NOT create a VOID type - if (typeName.equals(serdeConstants.VOID_TYPE_NAME)) { - throw new SemanticException(ErrorMsg.CTAS_CREATES_VOID_TYPE - .getMsg(colName)); - } - col.setType(typeName); - field_schemas.add(col); - } - - if (!first) { - cols = cols.concat(","); - colTypes = colTypes.concat(":"); - } - - first = false; - cols = cols.concat(colName); - - // Replace VOID type with string when the output is a temp table or - // local files. - // A VOID type can be generated under the query: - // - // select NULL from tt; - // or - // insert overwrite local directory "abc" select NULL from tt; - // - // where there is no column type to which the NULL value should be - // converted. - // - String tName = colInfo.getType().getTypeName(); - if (tName.equals(serdeConstants.VOID_TYPE_NAME)) { - colTypes = colTypes.concat(serdeConstants.STRING_TYPE_NAME); - } else { - colTypes = colTypes.concat(tName); + if (isLocal) { + assert !isMmTable; + // for local directory - we always write to map-red intermediate + // store and then copy to local fs + queryTmpdir = ctx.getMRTmpPath(); + } else { + // otherwise write to the file system implied by the directory + // no copy is required. we may want to revisit this policy in future + try { + Path qPath = FileUtils.makeQualified(dest_path, conf); + queryTmpdir = isMmTable ? qPath : ctx.getTempDirForPath(qPath); + Utilities.LOG14535.info("Setting query directory " + queryTmpdir + " from " + dest_path + " (" + isMmTable + ")"); + } catch (Exception e) { + throw new SemanticException("Error creating temporary folder on: " + + dest_path, e); } } + ColsAndTypes ct = deriveFileSinkColTypes(inputRR, field_schemas); + String cols = ct.cols, colTypes = ct.colTypes; + // update the create table descriptor with the resulting schema. if (tblDesc != null) { tblDesc.setCols(new ArrayList<FieldSchema>(field_schemas)); @@ -6779,8 +6735,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } boolean isDfsDir = (dest_type.intValue() == QBMetaData.DEST_DFS_FILE); - loadFileWork.add(new LoadFileDesc(tblDesc, viewDesc, queryTmpdir, dest_path, isDfsDir, cols, - colTypes)); + // Create LFD even for MM CTAS - it's a no-op move, but it still seems to be uses for stats. + loadFileWork.add(new LoadFileDesc(tblDesc, viewDesc, + queryTmpdir, dest_path, isDfsDir, cols, colTypes)); if (tblDesc == null) { if (viewDesc != null) { @@ -6866,6 +6823,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { dest_path, currentTableId, destTableIsAcid, destTableIsTemporary, destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS, canBeMerged, mmWriteId); + if (isMmCtas) { + // Add FSD so that the LoadTask compilation could fix up its path to avoid the move. + tableDesc.setWriter(fileSinkDesc); + } Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild( fileSinkDesc, fsRS, input), inputRR); @@ -6897,6 +6858,64 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { return output; } + private ColsAndTypes deriveFileSinkColTypes( + RowResolver inputRR, List<FieldSchema> field_schemas) throws SemanticException { + ColsAndTypes result = new ColsAndTypes("", ""); + ArrayList<ColumnInfo> colInfos = inputRR.getColumnInfos(); + boolean first = true; + for (ColumnInfo colInfo : colInfos) { + String[] nm = inputRR.reverseLookup(colInfo.getInternalName()); + + if (nm[1] != null) { // non-null column alias + colInfo.setAlias(nm[1]); + } + + String colName = colInfo.getInternalName(); //default column name + if (field_schemas != null) { + FieldSchema col = new FieldSchema(); + if (!("".equals(nm[0])) && nm[1] != null) { + colName = unescapeIdentifier(colInfo.getAlias()).toLowerCase(); // remove `` + } + colName = fixCtasColumnName(colName); + col.setName(colName); + String typeName = colInfo.getType().getTypeName(); + // CTAS should NOT create a VOID type + if (typeName.equals(serdeConstants.VOID_TYPE_NAME)) { + throw new SemanticException(ErrorMsg.CTAS_CREATES_VOID_TYPE.getMsg(colName)); + } + col.setType(typeName); + field_schemas.add(col); + } + + if (!first) { + result.cols = result.cols.concat(","); + result.colTypes = result.colTypes.concat(":"); + } + + first = false; + result.cols = result.cols.concat(colName); + + // Replace VOID type with string when the output is a temp table or + // local files. + // A VOID type can be generated under the query: + // + // select NULL from tt; + // or + // insert overwrite local directory "abc" select NULL from tt; + // + // where there is no column type to which the NULL value should be + // converted. + // + String tName = colInfo.getType().getTypeName(); + if (tName.equals(serdeConstants.VOID_TYPE_NAME)) { + result.colTypes = result.colTypes.concat(serdeConstants.STRING_TYPE_NAME); + } else { + result.colTypes = result.colTypes.concat(tName); + } + } + return result; + } + private static Long getMmWriteId(Table tbl, boolean isMmTable) throws HiveException { if (!isMmTable) return null; // Get the next write ID for this table. We will prefix files with this write ID. @@ -10145,7 +10164,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (partitions != null) { for (Partition partn : partitions) { // inputs.add(new ReadEntity(partn)); // is this needed at all? - LOG.info("XXX: adding part: "+partn); + LOG.info("XXX: adding part: "+partn); outputs.add(new WriteEntity(partn, WriteEntity.WriteType.DDL_NO_LOCK)); } } @@ -11809,7 +11828,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } } - if(location != null && location.length() != 0) { + if (location != null && location.length() != 0) { Path locPath = new Path(location); FileSystem curFs = null; FileStatus locStats = null; @@ -11818,7 +11837,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if(curFs != null) { locStats = curFs.getFileStatus(locPath); } - if(locStats != null && locStats.isDir()) { + if (locStats != null && locStats.isDir()) { FileStatus[] lStats = curFs.listStatus(locPath); if(lStats != null && lStats.length != 0) { throw new SemanticException(ErrorMsg.CTAS_LOCATION_NONEMPTY.getMsg(location)); @@ -11835,14 +11854,13 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } tblProps = addDefaultProperties(tblProps); - tableDesc = new CreateTableDesc(qualifiedTabName[0], dbDotTab, isExt, isTemporary, cols, partCols, bucketCols, sortCols, numBuckets, rowFormatParams.fieldDelim, rowFormatParams.fieldEscape, rowFormatParams.collItemDelim, rowFormatParams.mapKeyDelim, rowFormatParams.lineDelim, comment, storageFormat.getInputFormat(), storageFormat.getOutputFormat(), location, storageFormat.getSerde(), storageFormat.getStorageHandler(), storageFormat.getSerdeProps(), tblProps, ifNotExists, - skewedColNames, skewedValues, true, primaryKeys, foreignKeys); + skewedColNames, skewedValues, true, primaryKeys, foreignKeys); tableDesc.setMaterialization(isMaterialization); tableDesc.setStoredAsSubDirectories(storedAsDirs); tableDesc.setNullFormat(rowFormatParams.nullFormat); @@ -13177,4 +13195,12 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { this.loadFileWork = loadFileWork; } + private static final class ColsAndTypes { + public ColsAndTypes(String cols, String colTypes) { + this.cols = cols; + this.colTypes = colTypes; + } + public String cols; + public String colTypes; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 363d41a..e177925 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -34,7 +34,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.HiveStatsUtils; +import org.apache.hadoop.hive.common.ValidWriteIds; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.Context; @@ -62,6 +64,7 @@ import org.apache.hadoop.hive.ql.plan.CreateTableDesc; import org.apache.hadoop.hive.ql.plan.CreateViewDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.FetchWork; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.LoadFileDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; @@ -205,7 +208,7 @@ public abstract class TaskCompiler { } } else if (!isCStats) { for (LoadTableDesc ltd : loadTableWork) { - // TODO# move task is created here; handle MM tables + // TODO# What is this path? special case for MM? Task<MoveWork> tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf); mvTask.add(tsk); // Check to see if we are stale'ing any indexes and auto-update them if we want @@ -224,45 +227,15 @@ public abstract class TaskCompiler { } } - boolean oneLoadFile = true; + boolean oneLoadFileForCtas = true; for (LoadFileDesc lfd : loadFileWork) { if (pCtx.getQueryProperties().isCTAS() || pCtx.getQueryProperties().isMaterializedView()) { - assert (oneLoadFile); // should not have more than 1 load file for - // CTAS - // make the movetask's destination directory the table's destination. - Path location; - String loc = pCtx.getQueryProperties().isCTAS() ? - pCtx.getCreateTable().getLocation() : pCtx.getCreateViewDesc().getLocation(); - if (loc == null) { - // get the default location - Path targetPath; - try { - String protoName = null; - if (pCtx.getQueryProperties().isCTAS()) { - protoName = pCtx.getCreateTable().getTableName(); - } else if (pCtx.getQueryProperties().isMaterializedView()) { - protoName = pCtx.getCreateViewDesc().getViewName(); - } - String[] names = Utilities.getDbTableName(protoName); - if (!db.databaseExists(names[0])) { - throw new SemanticException("ERROR: The database " + names[0] - + " does not exist."); - } - Warehouse wh = new Warehouse(conf); - targetPath = wh.getTablePath(db.getDatabase(names[0]), names[1]); - } catch (HiveException e) { - throw new SemanticException(e); - } catch (MetaException e) { - throw new SemanticException(e); - } - - location = targetPath; - } else { - location = new Path(loc); + if (!oneLoadFileForCtas) { // should not have more than 1 load file for CTAS. + throw new SemanticException( + "One query is not expected to contain multiple CTAS loads statements"); } - lfd.setTargetDir(location); - - oneLoadFile = false; + setLoadFileLocation(pCtx, lfd); + oneLoadFileForCtas = false; } mvTask.add(TaskFactory.get(new MoveWork(null, null, null, lfd, false), conf)); } @@ -288,28 +261,7 @@ public abstract class TaskCompiler { * a column stats task instead of a fetch task to persist stats to the metastore. */ if (isCStats || !pCtx.getColumnStatsAutoGatherContexts().isEmpty()) { - Set<Task<? extends Serializable>> leafTasks = new LinkedHashSet<Task<? extends Serializable>>(); - getLeafTasks(rootTasks, leafTasks); - if (isCStats) { - genColumnStatsTask(pCtx.getAnalyzeRewrite(), loadFileWork, leafTasks, outerQueryLimit, 0); - } else { - for (ColumnStatsAutoGatherContext columnStatsAutoGatherContext : pCtx - .getColumnStatsAutoGatherContexts()) { - if (!columnStatsAutoGatherContext.isInsertInto()) { - genColumnStatsTask(columnStatsAutoGatherContext.getAnalyzeRewrite(), - columnStatsAutoGatherContext.getLoadFileWork(), leafTasks, outerQueryLimit, 0); - } else { - int numBitVector; - try { - numBitVector = HiveStatsUtils.getNumBitVectorsForNDVEstimation(conf); - } catch (Exception e) { - throw new SemanticException(e.getMessage()); - } - genColumnStatsTask(columnStatsAutoGatherContext.getAnalyzeRewrite(), - columnStatsAutoGatherContext.getLoadFileWork(), leafTasks, outerQueryLimit, numBitVector); - } - } - } + createColumnStatsTasks(pCtx, rootTasks, loadFileWork, isCStats, outerQueryLimit); } decideExecMode(rootTasks, ctx, globalLimitCtx); @@ -356,6 +308,80 @@ public abstract class TaskCompiler { } } + private void setLoadFileLocation( + final ParseContext pCtx, LoadFileDesc lfd) throws SemanticException { + // CTAS; make the movetask's destination directory the table's destination. + Long mmWriteIdForCtas = null; + FileSinkDesc dataSinkForCtas = null; + String loc = null; + if (pCtx.getQueryProperties().isCTAS()) { + CreateTableDesc ctd = pCtx.getCreateTable(); + dataSinkForCtas = ctd.getAndUnsetWriter(); + mmWriteIdForCtas = ctd.getInitialWriteId(); + loc = ctd.getLocation(); + } else { + loc = pCtx.getCreateViewDesc().getLocation(); + } + Path location = (loc == null) ? getDefaultCtasLocation(pCtx) : new Path(loc); + if (mmWriteIdForCtas != null) { + dataSinkForCtas.setDirName(location); + location = new Path(location, ValidWriteIds.getMmFilePrefix(mmWriteIdForCtas)); + lfd.setSourcePath(location); + Utilities.LOG14535.info("Setting MM CTAS to " + location); + } + Utilities.LOG14535.info("Location for LFD is being set to " + location + "; moving from " + lfd.getSourcePath()); + lfd.setTargetDir(location); + } + + private void createColumnStatsTasks(final ParseContext pCtx, + final List<Task<? extends Serializable>> rootTasks, + List<LoadFileDesc> loadFileWork, boolean isCStats, int outerQueryLimit) + throws SemanticException { + Set<Task<? extends Serializable>> leafTasks = new LinkedHashSet<Task<? extends Serializable>>(); + getLeafTasks(rootTasks, leafTasks); + if (isCStats) { + genColumnStatsTask(pCtx.getAnalyzeRewrite(), loadFileWork, leafTasks, outerQueryLimit, 0); + } else { + for (ColumnStatsAutoGatherContext columnStatsAutoGatherContext : pCtx + .getColumnStatsAutoGatherContexts()) { + if (!columnStatsAutoGatherContext.isInsertInto()) { + genColumnStatsTask(columnStatsAutoGatherContext.getAnalyzeRewrite(), + columnStatsAutoGatherContext.getLoadFileWork(), leafTasks, outerQueryLimit, 0); + } else { + int numBitVector; + try { + numBitVector = HiveStatsUtils.getNumBitVectorsForNDVEstimation(conf); + } catch (Exception e) { + throw new SemanticException(e.getMessage()); + } + genColumnStatsTask(columnStatsAutoGatherContext.getAnalyzeRewrite(), + columnStatsAutoGatherContext.getLoadFileWork(), leafTasks, outerQueryLimit, numBitVector); + } + } + } + } + + private Path getDefaultCtasLocation(final ParseContext pCtx) throws SemanticException { + try { + String protoName = null; + if (pCtx.getQueryProperties().isCTAS()) { + protoName = pCtx.getCreateTable().getTableName(); + } else if (pCtx.getQueryProperties().isMaterializedView()) { + protoName = pCtx.getCreateViewDesc().getViewName(); + } + String[] names = Utilities.getDbTableName(protoName); + if (!db.databaseExists(names[0])) { + throw new SemanticException("ERROR: The database " + names[0] + " does not exist."); + } + Warehouse wh = new Warehouse(conf); + return wh.getTablePath(db.getDatabase(names[0]), names[1]); + } catch (HiveException e) { + throw new SemanticException(e); + } catch (MetaException e) { + throw new SemanticException(e); + } + } + private void patchUpAfterCTASorMaterializedView(final List<Task<? extends Serializable>> rootTasks, final HashSet<WriteEntity> outputs, Task<? extends Serializable> createTask) { http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java index eafba21..7609068 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java @@ -94,6 +94,10 @@ public class CreateTableDesc extends DDLDesc implements Serializable { private boolean isCTAS = false; List<SQLPrimaryKey> primaryKeys; List<SQLForeignKey> foreignKeys; + private Long initialWriteId; + // The FSOP configuration for the FSOP that is going to write initial data during ctas. + // This is not needed beyond compilation, so it is transient. + private transient FileSinkDesc writer; public CreateTableDesc() { } @@ -825,5 +829,23 @@ public class CreateTableDesc extends DDLDesc implements Serializable { return tbl; } + public void setInitialWriteId(Long mmWriteId) { + this.initialWriteId = mmWriteId; + } + + public Long getInitialWriteId() { + return initialWriteId; + } + + public FileSinkDesc getAndUnsetWriter() { + FileSinkDesc fsd = writer; + writer = null; + return fsd; + } + + public void setWriter(FileSinkDesc writer) { + this.writer = writer; + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index 8bef7a9..d7d6e38 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -82,7 +82,6 @@ public class FileSinkDesc extends AbstractOperatorDesc { // the sub-queries write to sub-directories of a common directory. So, the file sink // descriptors for subq1 and subq2 are linked. private boolean linkedFileSink = false; - private Path parentDir; transient private List<FileSinkDesc> linkedFileSinkDesc; private boolean statsReliable; @@ -152,7 +151,6 @@ public class FileSinkDesc extends AbstractOperatorDesc { ret.setStaticSpec(staticSpec); ret.setStatsAggPrefix(statsKeyPref); ret.setLinkedFileSink(linkedFileSink); - ret.setParentDir(parentDir); ret.setLinkedFileSinkDesc(linkedFileSinkDesc); ret.setStatsReliable(statsReliable); ret.setDpSortState(dpSortState); @@ -180,7 +178,7 @@ public class FileSinkDesc extends AbstractOperatorDesc { } public Path getFinalDirName() { - return linkedFileSink ? parentDir : dirName; + return linkedFileSink ? dirName.getParent() : dirName; } /** getFinalDirName that takes into account MM, but not DP, LB or buckets. */ @@ -395,11 +393,7 @@ public class FileSinkDesc extends AbstractOperatorDesc { } public Path getParentDir() { - return parentDir; - } - - public void setParentDir(Path parentDir) { - this.parentDir = parentDir; + return dirName.getParent(); } public boolean isStatsReliable() { http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java index bcd3125..d46f71e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java @@ -42,5 +42,8 @@ public class LoadDesc implements Serializable { public Path getSourcePath() { return sourcePath; } - + + public void setSourcePath(Path path) { + this.sourcePath = path; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java index 064a864..7670ef2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java @@ -60,7 +60,7 @@ public class LoadFileDesc extends LoadDesc implements Serializable { final boolean isDfsDir, final String columns, final String columnTypes) { super(sourcePath); - Utilities.LOG14535.info("creating LFD from " + sourcePath + " to " + targetDir/*, new Exception()*/); + Utilities.LOG14535.info("creating LFD from " + sourcePath + " to " + targetDir, new Exception()); this.targetDir = targetDir; this.isDfsDir = isDfsDir; this.columns = columns; http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java index f0b2775..3ada134 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java @@ -70,7 +70,7 @@ public class MoveWork implements Serializable { boolean checkFileFormat, boolean srcLocal) { this(inputs, outputs); Utilities.LOG14535.info("Creating MoveWork " + System.identityHashCode(this) - + " with " + loadTableWork + "; " + loadFileWork); + + " with " + loadTableWork + "; " + loadFileWork, new Exception()); this.loadTableWork = loadTableWork; this.loadFileWork = loadFileWork; this.checkFileFormat = checkFileFormat; http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/test/queries/clientpositive/mm_all.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/mm_all.q b/ql/src/test/queries/clientpositive/mm_all.q index 8ce42a2..8163d2f 100644 --- a/ql/src/test/queries/clientpositive/mm_all.q +++ b/ql/src/test/queries/clientpositive/mm_all.q @@ -162,30 +162,22 @@ select * from merge1_mm; drop table merge1_mm; +-- TODO: need to include merge+union+DP, but it's broken for now --- TODO: need to include merge+union, but it's broken for now +drop table ctas0_mm; +create table ctas0_mm tblproperties ('hivecommit'='true') as select * from intermediate; +select * from ctas0_mm; +drop table ctas0_mm; +drop table ctas1_mm; +create table ctas1_mm tblproperties ('hivecommit'='true') as + select * from intermediate union all select * from intermediate; +select * from ctas1_mm; +drop table ctas1_mm; - --- future - - - - - - - ---drop table ctas_mm; --- --- ---create table ctas_mm tblproperties ('hivecommit'='true') as select * from src limit 3; --- --- ----- TODO load, multi-insert etc --- --- +-- TODO load, multi-insert, buckets drop table intermediate; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/test/queries/clientpositive/mm_current.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/mm_current.q b/ql/src/test/queries/clientpositive/mm_current.q index f423b00..f2d353f 100644 --- a/ql/src/test/queries/clientpositive/mm_current.q +++ b/ql/src/test/queries/clientpositive/mm_current.q @@ -12,15 +12,13 @@ insert into table intermediate partition(p='455') select distinct key from src w insert into table intermediate partition(p='456') select distinct key from src where key is not null order by key asc limit 2; -set hive.merge.orcfile.stripe.level=true; -set hive.merge.tezfiles=true; -set hive.merge.mapfiles=true; -set hive.merge.mapredfiles=true; - - +create table ctas1_mm tblproperties ('hivecommit'='true') as + select * from intermediate union all select * from intermediate; +select * from ctas1_mm; +drop table ctas1_mm; drop table intermediate; http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/test/results/clientpositive/llap/mm_all.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/mm_all.q.out b/ql/src/test/results/clientpositive/llap/mm_all.q.out index f8001c2..93716de 100644 --- a/ql/src/test/results/clientpositive/llap/mm_all.q.out +++ b/ql/src/test/results/clientpositive/llap/mm_all.q.out @@ -154,11 +154,11 @@ POSTHOOK: Input: default@part_mm@key_mm=456 10 455 10 455 97 455 -97 456 97 455 -98 455 +97 456 98 456 98 455 +98 455 PREHOOK: query: drop table part_mm PREHOOK: type: DROPTABLE PREHOOK: Input: default@part_mm @@ -903,59 +903,105 @@ POSTHOOK: query: drop table merge1_mm POSTHOOK: type: DROPTABLE POSTHOOK: Input: default@merge1_mm POSTHOOK: Output: default@merge1_mm -PREHOOK: query: -- TODO: need to include merge+union, but it's broken for now - - - - - - --- future - - - +PREHOOK: query: -- TODO: need to include merge+union+DP, but it's broken for now +drop table ctas0_mm +PREHOOK: type: DROPTABLE +POSTHOOK: query: -- TODO: need to include merge+union+DP, but it's broken for now ---drop table ctas_mm; --- --- ---create table ctas_mm tblproperties ('hivecommit'='true') as select * from src limit 3; --- --- ----- TODO load, multi-insert etc --- --- +drop table ctas0_mm +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table ctas0_mm tblproperties ('hivecommit'='true') as select * from intermediate +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@intermediate +PREHOOK: Input: default@intermediate@p=455 +PREHOOK: Input: default@intermediate@p=456 +PREHOOK: Output: database:default +PREHOOK: Output: default@ctas0_mm +POSTHOOK: query: create table ctas0_mm tblproperties ('hivecommit'='true') as select * from intermediate +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@intermediate +POSTHOOK: Input: default@intermediate@p=455 +POSTHOOK: Input: default@intermediate@p=456 +POSTHOOK: Output: database:default +POSTHOOK: Output: default@ctas0_mm +POSTHOOK: Lineage: ctas0_mm.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: ctas0_mm.p SIMPLE [(intermediate)intermediate.FieldSchema(name:p, type:int, comment:null), ] +PREHOOK: query: select * from ctas0_mm +PREHOOK: type: QUERY +PREHOOK: Input: default@ctas0_mm +#### A masked pattern was here #### +POSTHOOK: query: select * from ctas0_mm +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ctas0_mm +#### A masked pattern was here #### +98 455 +97 455 +0 456 +10 456 +PREHOOK: query: drop table ctas0_mm +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@ctas0_mm +PREHOOK: Output: default@ctas0_mm +POSTHOOK: query: drop table ctas0_mm +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@ctas0_mm +POSTHOOK: Output: default@ctas0_mm +PREHOOK: query: drop table ctas1_mm +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table ctas1_mm +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table ctas1_mm tblproperties ('hivecommit'='true') as + select * from intermediate union all select * from intermediate +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@intermediate +PREHOOK: Input: default@intermediate@p=455 +PREHOOK: Input: default@intermediate@p=456 +PREHOOK: Output: database:default +PREHOOK: Output: default@ctas1_mm +POSTHOOK: query: create table ctas1_mm tblproperties ('hivecommit'='true') as + select * from intermediate union all select * from intermediate +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@intermediate +POSTHOOK: Input: default@intermediate@p=455 +POSTHOOK: Input: default@intermediate@p=456 +POSTHOOK: Output: database:default +POSTHOOK: Output: default@ctas1_mm +POSTHOOK: Lineage: ctas1_mm.key EXPRESSION [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: ctas1_mm.p EXPRESSION [(intermediate)intermediate.FieldSchema(name:p, type:int, comment:null), ] +PREHOOK: query: select * from ctas1_mm +PREHOOK: type: QUERY +PREHOOK: Input: default@ctas1_mm +#### A masked pattern was here #### +POSTHOOK: query: select * from ctas1_mm +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ctas1_mm +#### A masked pattern was here #### +98 455 +97 455 +0 456 +10 456 +98 455 +97 455 +0 456 +10 456 +PREHOOK: query: drop table ctas1_mm +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@ctas1_mm +PREHOOK: Output: default@ctas1_mm +POSTHOOK: query: drop table ctas1_mm +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@ctas1_mm +POSTHOOK: Output: default@ctas1_mm +PREHOOK: query: -- TODO load, multi-insert, buckets drop table intermediate PREHOOK: type: DROPTABLE PREHOOK: Input: default@intermediate PREHOOK: Output: default@intermediate -POSTHOOK: query: -- TODO: need to include merge+union, but it's broken for now - - - - - - --- future - - - - - - - ---drop table ctas_mm; --- --- ---create table ctas_mm tblproperties ('hivecommit'='true') as select * from src limit 3; --- --- ----- TODO load, multi-insert etc --- --- +POSTHOOK: query: -- TODO load, multi-insert, buckets drop table intermediate POSTHOOK: type: DROPTABLE http://git-wip-us.apache.org/repos/asf/hive/blob/af4ff378/ql/src/test/results/clientpositive/llap/mm_current.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/mm_current.q.out b/ql/src/test/results/clientpositive/llap/mm_current.q.out index 4d28c63..5b51fa3 100644 --- a/ql/src/test/results/clientpositive/llap/mm_current.q.out +++ b/ql/src/test/results/clientpositive/llap/mm_current.q.out @@ -28,6 +28,48 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@src POSTHOOK: Output: default@intermediate@p=456 POSTHOOK: Lineage: intermediate PARTITION(p=456).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +PREHOOK: query: create table ctas1_mm tblproperties ('hivecommit'='true') as + select * from intermediate union all select * from intermediate +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@intermediate +PREHOOK: Input: default@intermediate@p=455 +PREHOOK: Input: default@intermediate@p=456 +PREHOOK: Output: database:default +PREHOOK: Output: default@ctas1_mm +POSTHOOK: query: create table ctas1_mm tblproperties ('hivecommit'='true') as + select * from intermediate union all select * from intermediate +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@intermediate +POSTHOOK: Input: default@intermediate@p=455 +POSTHOOK: Input: default@intermediate@p=456 +POSTHOOK: Output: database:default +POSTHOOK: Output: default@ctas1_mm +POSTHOOK: Lineage: ctas1_mm.key EXPRESSION [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: ctas1_mm.p EXPRESSION [(intermediate)intermediate.FieldSchema(name:p, type:int, comment:null), ] +PREHOOK: query: select * from ctas1_mm +PREHOOK: type: QUERY +PREHOOK: Input: default@ctas1_mm +#### A masked pattern was here #### +POSTHOOK: query: select * from ctas1_mm +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ctas1_mm +#### A masked pattern was here #### +98 455 +97 455 +0 456 +10 456 +98 455 +97 455 +0 456 +10 456 +PREHOOK: query: drop table ctas1_mm +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@ctas1_mm +PREHOOK: Output: default@ctas1_mm +POSTHOOK: query: drop table ctas1_mm +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@ctas1_mm +POSTHOOK: Output: default@ctas1_mm PREHOOK: query: drop table intermediate PREHOOK: type: DROPTABLE PREHOOK: Input: default@intermediate