Repository: hive Updated Branches: refs/heads/master 87860fbca -> dfd7ea368
HIVE-18429 - Compaction should handle a case when it produces no output (Eugene Koifman, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fe3190d1 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fe3190d1 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fe3190d1 Branch: refs/heads/master Commit: fe3190d19fa1324a6835692e71f747e6cb37c7d1 Parents: 87860fb Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Tue Jan 16 09:30:09 2018 -0800 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Tue Jan 16 09:31:55 2018 -0800 ---------------------------------------------------------------------- .../hive/ql/txn/compactor/CompactorMR.java | 29 ++++++++++-- .../apache/hadoop/hive/ql/TestTxnNoBuckets.java | 48 +++++++++++++++++++- 2 files changed, 71 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/fe3190d1/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 26f5c7e..a64c9d1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -295,7 +295,7 @@ public class CompactorMR { } /** - * @param baseDir if not null, it's either table/partition root folder or base_xxxx. + * @param baseDir if not null, it's either table/partition root folder or base_xxxx. * If it's base_xxxx, it's in dirsToSearch, else the actual original files * (all leaves recursively) are in the dirsToSearch list */ @@ -915,13 +915,32 @@ public class CompactorMR { FileSystem fs = tmpLocation.getFileSystem(conf); LOG.debug("Moving contents of " + tmpLocation.toString() + " to " + finalLocation.toString()); - + if(!fs.exists(tmpLocation)) { + /** + * No 'tmpLocation' may happen if job generated created 0 splits, which happens if all + * input delta and/or base files were empty or had + * only {@link org.apache.orc.impl.OrcAcidUtils#getSideFile(Path)} files. + * So make sure the new base/delta is created. + */ + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) + .writingBase(conf.getBoolean(IS_MAJOR, false)) + .isCompressed(conf.getBoolean(IS_COMPRESSED, false)) + .minimumTransactionId(conf.getLong(MIN_TXN, Long.MAX_VALUE)) + .maximumTransactionId(conf.getLong(MAX_TXN, Long.MIN_VALUE)) + .bucket(0) + .statementId(-1); + Path newDeltaDir = AcidUtils.createFilename(finalLocation, options).getParent(); + LOG.info(context.getJobID() + ": " + tmpLocation + + " not found. Assuming 0 splits. Creating " + newDeltaDir); + fs.mkdirs(newDeltaDir); + return; + } FileStatus[] contents = fs.listStatus(tmpLocation);//expect 1 base or delta dir in this list //we have MIN_TXN, MAX_TXN and IS_MAJOR in JobConf so we could figure out exactly what the dir //name is that we want to rename; leave it for another day - for (int i = 0; i < contents.length; i++) { - Path newPath = new Path(finalLocation, contents[i].getPath().getName()); - fs.rename(contents[i].getPath(), newPath); + for (FileStatus fileStatus : contents) { + Path newPath = new Path(finalLocation, fileStatus.getPath().getName()); + fs.rename(fileStatus.getPath(), newPath); } fs.delete(tmpLocation, true); } http://git-wip-us.apache.org/repos/asf/hive/blob/fe3190d1/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java index ac44779..bbf6c86 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -17,11 +17,14 @@ */ package org.apache.hadoop.hive.ql; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -697,9 +700,9 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver map = hms.getPartitionColumnStatistics("default","T", partNames, colNames); Assert.assertEquals("", 5, map.get(partNames.get(0)).get(0).getStatsData().getLongStats().getHighValue()); } - @Ignore("enable after HIVE-18294") @Test public void testDefault() throws Exception { + hiveConf.set(MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID.getVarname(), "true"); runStatementOnDriver("drop table if exists T"); runStatementOnDriver("create table T (a int, b int) stored as orc"); runStatementOnDriver("insert into T values(1,2),(3,4)"); @@ -711,6 +714,49 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver {"{\"transactionid\":15,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000015_0000015_0000/bucket_00000"} }; checkExpected(rs, expected, "insert data"); + } + /** + * see HIVE-18429 + */ + @Test + public void testEmptyCompactionResult() throws Exception { + hiveConf.set(MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID.getVarname(), "true"); + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T (a int, b int) stored as orc"); + int[][] data = {{1,2}, {3,4}}; + runStatementOnDriver("insert into T" + makeValuesClause(data)); + runStatementOnDriver("insert into T" + makeValuesClause(data)); + + //delete the bucket files so now we have empty delta dirs + List<String> rs = runStatementOnDriver("select distinct INPUT__FILE__NAME from T"); + FileSystem fs = FileSystem.get(hiveConf); + for(String path : rs) { + fs.delete(new Path(path), true); + } + runStatementOnDriver("alter table T compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + + //check status of compaction job + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize()); + Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState()); + Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local")); + + //now run another compaction make sure empty dirs don't cause issues + runStatementOnDriver("insert into T" + makeValuesClause(data)); + runStatementOnDriver("alter table T compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + + //check status of compaction job + resp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals("Unexpected number of compactions in history", 2, resp.getCompactsSize()); + for(int i = 0; i < 2; i++) { + Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(i).getState()); + Assert.assertTrue(resp.getCompacts().get(i).getHadoopJobId().startsWith("job_local")); + } + rs = runStatementOnDriver("select a, b from T order by a, b"); + Assert.assertEquals(stringifyValues(data), rs); } }