Repository: hive
Updated Branches:
  refs/heads/master 87860fbca -> dfd7ea368


HIVE-18429 - Compaction should handle a case when it produces no output (Eugene 
Koifman, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fe3190d1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fe3190d1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fe3190d1

Branch: refs/heads/master
Commit: fe3190d19fa1324a6835692e71f747e6cb37c7d1
Parents: 87860fb
Author: Eugene Koifman <ekoif...@hortonworks.com>
Authored: Tue Jan 16 09:30:09 2018 -0800
Committer: Eugene Koifman <ekoif...@hortonworks.com>
Committed: Tue Jan 16 09:31:55 2018 -0800

----------------------------------------------------------------------
 .../hive/ql/txn/compactor/CompactorMR.java      | 29 ++++++++++--
 .../apache/hadoop/hive/ql/TestTxnNoBuckets.java | 48 +++++++++++++++++++-
 2 files changed, 71 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/fe3190d1/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 26f5c7e..a64c9d1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -295,7 +295,7 @@ public class CompactorMR {
   }
 
   /**
-   * @param baseDir if not null, it's either table/partition root folder or 
base_xxxx.  
+   * @param baseDir if not null, it's either table/partition root folder or 
base_xxxx.
    *                If it's base_xxxx, it's in dirsToSearch, else the actual 
original files
    *                (all leaves recursively) are in the dirsToSearch list
    */
@@ -915,13 +915,32 @@ public class CompactorMR {
       FileSystem fs = tmpLocation.getFileSystem(conf);
       LOG.debug("Moving contents of " + tmpLocation.toString() + " to " +
           finalLocation.toString());
-
+      if(!fs.exists(tmpLocation)) {
+        /**
+         * No 'tmpLocation' may happen if job generated created 0 splits, 
which happens if all
+         * input delta and/or base files were empty or had
+         * only {@link org.apache.orc.impl.OrcAcidUtils#getSideFile(Path)} 
files.
+         * So make sure the new base/delta is created.
+         */
+        AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+            .writingBase(conf.getBoolean(IS_MAJOR, false))
+            .isCompressed(conf.getBoolean(IS_COMPRESSED, false))
+            .minimumTransactionId(conf.getLong(MIN_TXN, Long.MAX_VALUE))
+            .maximumTransactionId(conf.getLong(MAX_TXN, Long.MIN_VALUE))
+            .bucket(0)
+            .statementId(-1);
+        Path newDeltaDir = AcidUtils.createFilename(finalLocation, 
options).getParent();
+        LOG.info(context.getJobID() + ": " + tmpLocation +
+            " not found.  Assuming 0 splits.  Creating " + newDeltaDir);
+        fs.mkdirs(newDeltaDir);
+        return;
+      }
       FileStatus[] contents = fs.listStatus(tmpLocation);//expect 1 base or 
delta dir in this list
       //we have MIN_TXN, MAX_TXN and IS_MAJOR in JobConf so we could figure 
out exactly what the dir
       //name is that we want to rename; leave it for another day
-      for (int i = 0; i < contents.length; i++) {
-        Path newPath = new Path(finalLocation, 
contents[i].getPath().getName());
-        fs.rename(contents[i].getPath(), newPath);
+      for (FileStatus fileStatus : contents) {
+        Path newPath = new Path(finalLocation, fileStatus.getPath().getName());
+        fs.rename(fileStatus.getPath(), newPath);
       }
       fs.delete(tmpLocation, true);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/fe3190d1/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
index ac44779..bbf6c86 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
@@ -17,11 +17,14 @@
  */
 package org.apache.hadoop.hive.ql;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -697,9 +700,9 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree 
/Users/ekoifman/dev/hiver
     map = hms.getPartitionColumnStatistics("default","T", partNames, colNames);
     Assert.assertEquals("", 5, 
map.get(partNames.get(0)).get(0).getStatsData().getLongStats().getHighValue());
   }
-  @Ignore("enable after HIVE-18294")
   @Test
   public void testDefault() throws Exception {
+    hiveConf.set(MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID.getVarname(), 
"true");
     runStatementOnDriver("drop table if exists T");
     runStatementOnDriver("create table T (a int, b int) stored as orc");
     runStatementOnDriver("insert into T values(1,2),(3,4)");
@@ -711,6 +714,49 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree 
/Users/ekoifman/dev/hiver
       {"{\"transactionid\":15,\"bucketid\":536870912,\"rowid\":1}\t3\t4", 
"t/delta_0000015_0000015_0000/bucket_00000"}
     };
     checkExpected(rs, expected, "insert data");
+  }
+  /**
+   * see HIVE-18429
+   */
+  @Test
+  public void testEmptyCompactionResult() throws Exception {
+    hiveConf.set(MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID.getVarname(), 
"true");
+    runStatementOnDriver("drop table if exists T");
+    runStatementOnDriver("create table T (a int, b int) stored as orc");
+    int[][] data = {{1,2}, {3,4}};
+    runStatementOnDriver("insert into T" + makeValuesClause(data));
+    runStatementOnDriver("insert into T" + makeValuesClause(data));
+
+    //delete the bucket files so now we have empty delta dirs
+    List<String> rs = runStatementOnDriver("select distinct INPUT__FILE__NAME 
from T");
+    FileSystem fs = FileSystem.get(hiveConf);
+    for(String path : rs) {
+      fs.delete(new Path(path), true);
+    }
+    runStatementOnDriver("alter table T compact 'major'");
+    TestTxnCommands2.runWorker(hiveConf);
+
+    //check status of compaction job
+    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
+    ShowCompactResponse resp = txnHandler.showCompact(new 
ShowCompactRequest());
+    Assert.assertEquals("Unexpected number of compactions in history", 1, 
resp.getCompactsSize());
+    Assert.assertEquals("Unexpected 0 compaction state", 
TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
+    
Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local"));
+
+    //now run another compaction make sure empty dirs don't cause issues
+    runStatementOnDriver("insert into T" + makeValuesClause(data));
+    runStatementOnDriver("alter table T compact 'major'");
+    TestTxnCommands2.runWorker(hiveConf);
+
+    //check status of compaction job
+    resp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals("Unexpected number of compactions in history", 2, 
resp.getCompactsSize());
+    for(int i = 0; i < 2; i++) {
+      Assert.assertEquals("Unexpected 0 compaction state", 
TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(i).getState());
+      
Assert.assertTrue(resp.getCompacts().get(i).getHadoopJobId().startsWith("job_local"));
+    }
+    rs = runStatementOnDriver("select a, b from T order by a, b");
+    Assert.assertEquals(stringifyValues(data), rs);
 
   }
 }

Reply via email to