Repository: hive
Updated Branches:
  refs/heads/master 53e01e4a5 -> d43938ca1


HIVE-13961 : ACID: Major compaction fails to include the original bucket files 
if there's no delta directory (Wei Zheng, reviewed by Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d43938ca
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d43938ca
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d43938ca

Branch: refs/heads/master
Commit: d43938ca127a849e50e8eaddd6313d409cea6770
Parents: 53e01e4
Author: Wei Zheng <w...@apache.org>
Authored: Wed Jun 15 10:19:30 2016 -0700
Committer: Wei Zheng <w...@apache.org>
Committed: Wed Jun 15 10:19:30 2016 -0700

----------------------------------------------------------------------
 .../hive/ql/txn/compactor/CompactorMR.java      |   7 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java | 308 ++++++++++++++++++-
 2 files changed, 310 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d43938ca/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 71e69d5..6caca98 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -254,10 +254,9 @@ public class CompactorMR {
       }
     }
 
-    if (parsedDeltas.size() == 0) {
-      // Seriously, no deltas?  Can't compact that.
-      LOG.error(  "No delta files found to compact in " + sd.getLocation());
-      //couldn't someone want to run a Major compaction to convert old table 
to ACID?
+    if (parsedDeltas.size() == 0 && dir.getOriginalFiles() == null) {
+      // Skip compaction if there's no delta files AND there's no original 
files
+      LOG.error("No delta files or original files found to compact in " + 
sd.getLocation());
       return;
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/d43938ca/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index a1bd0fb..e76c925 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -270,10 +270,15 @@ public class TestTxnCommands2 {
 
   /**
    * Test the query correctness and directory layout after ACID table 
conversion and MAJOR compaction
+   * 1. Insert a row to Non-ACID table
+   * 2. Convert Non-ACID to ACID table
+   * 3. Insert a row to ACID table
+   * 4. Perform Major compaction
+   * 5. Clean
    * @throws Exception
    */
   @Test
-  public void testNonAcidToAcidConversionAndMajorCompaction() throws Exception 
{
+  public void testNonAcidToAcidConversion1() throws Exception {
     FileSystem fs = FileSystem.get(hiveConf);
     FileStatus[] status;
 
@@ -394,6 +399,307 @@ public class TestTxnCommands2 {
     Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
   }
 
+  /**
+   * Test the query correctness and directory layout after ACID table 
conversion and MAJOR compaction
+   * 1. Insert a row to Non-ACID table
+   * 2. Convert Non-ACID to ACID table
+   * 3. Update the existing row in ACID table
+   * 4. Perform Major compaction
+   * 5. Clean
+   * @throws Exception
+   */
+  @Test
+  public void testNonAcidToAcidConversion2() throws Exception {
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] status;
+
+    // 1. Insert a row to Non-ACID table
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) 
values(1,2)");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    // There should be 2 original bucket files in the location (000000_0 and 
000001_0)
+    Assert.assertEquals(BUCKET_COUNT, status.length);
+    for (int i = 0; i < status.length; i++) {
+      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+    }
+    List<String> rs = runStatementOnDriver("select a,b from " + 
Table.NONACIDORCTBL);
+    int [][] resultData = new int[][] {{1, 2}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    int resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 2. Convert NONACIDORCTBL to ACID table
+    runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET 
TBLPROPERTIES ('transactional'='true')");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    // Everything should be same as before
+    Assert.assertEquals(BUCKET_COUNT, status.length);
+    for (int i = 0; i < status.length; i++) {
+      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+    }
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 2}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 3. Update the existing row in newly-converted ACID table
+    runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where 
a=1");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    // There should be 2 original bucket files (000000_0 and 000001_0), plus a 
new delta directory.
+    // The delta directory should also have 2 bucket files (bucket_00000 and 
bucket_00001)
+    Assert.assertEquals(3, status.length);
+    boolean sawNewDelta = false;
+    for (int i = 0; i < status.length; i++) {
+      if (status[i].getPath().getName().matches("delta_.*")) {
+        sawNewDelta = true;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), 
FileUtils.STAGING_DIR_PATH_FILTER);
+        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+        
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+      } else {
+        
Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+      }
+    }
+    Assert.assertTrue(sawNewDelta);
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 3}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 4. Perform a major compaction
+    runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 
'MAJOR'");
+    runWorker(hiveConf);
+    // There should be 1 new directory: base_0000001.
+    // Original bucket files and delta directory should stay until Cleaner 
kicks in.
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(4, status.length);
+    boolean sawNewBase = false;
+    for (int i = 0; i < status.length; i++) {
+      if (status[i].getPath().getName().matches("base_.*")) {
+        sawNewBase = true;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), 
FileUtils.STAGING_DIR_PATH_FILTER);
+        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+        
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
+      }
+    }
+    Assert.assertTrue(sawNewBase);
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 3}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 5. Let Cleaner delete obsolete files/dirs
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    // Before Cleaner, there should be 4 items:
+    // 2 original files, 1 delta directory and 1 base directory
+    Assert.assertEquals(4, status.length);
+    runCleaner(hiveConf);
+    // There should be only 1 directory left: base_0000001.
+    // Original bucket files and delta directory should have been cleaned up.
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(1, status.length);
+    Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
+    FileStatus[] buckets = fs.listStatus(status[0].getPath(), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+    Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 3}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+  }
+
+  /**
+   * Test the query correctness and directory layout after ACID table 
conversion and MAJOR compaction
+   * 1. Insert a row to Non-ACID table
+   * 2. Convert Non-ACID to ACID table
+   * 3. Perform Major compaction
+   * 4. Insert a new row to ACID table
+   * 5. Perform another Major compaction
+   * 6. Clean
+   * @throws Exception
+   */
+  @Test
+  public void testNonAcidToAcidConversion3() throws Exception {
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] status;
+
+    // 1. Insert a row to Non-ACID table
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) 
values(1,2)");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    // There should be 2 original bucket files in the location (000000_0 and 
000001_0)
+    Assert.assertEquals(BUCKET_COUNT, status.length);
+    for (int i = 0; i < status.length; i++) {
+      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+    }
+    List<String> rs = runStatementOnDriver("select a,b from " + 
Table.NONACIDORCTBL);
+    int [][] resultData = new int[][] {{1, 2}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    int resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 2. Convert NONACIDORCTBL to ACID table
+    runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET 
TBLPROPERTIES ('transactional'='true')");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    // Everything should be same as before
+    Assert.assertEquals(BUCKET_COUNT, status.length);
+    for (int i = 0; i < status.length; i++) {
+      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+    }
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 2}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 3. Perform a major compaction
+    runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 
'MAJOR'");
+    runWorker(hiveConf);
+    // There should be 1 new directory: base_-9223372036854775808
+    // Original bucket files should stay until Cleaner kicks in.
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(3, status.length);
+    boolean sawNewBase = false;
+    for (int i = 0; i < status.length; i++) {
+      if (status[i].getPath().getName().matches("base_.*")) {
+        Assert.assertEquals("base_-9223372036854775808", 
status[i].getPath().getName());
+        sawNewBase = true;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), 
FileUtils.STAGING_DIR_PATH_FILTER);
+        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+        Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+      }
+    }
+    Assert.assertTrue(sawNewBase);
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 2}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 4. Update the existing row, and insert another row to newly-converted 
ACID table
+    runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where 
a=1");
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) 
values(3,4)");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    Arrays.sort(status);  // make sure delta_0000001_0000001_0000 appears 
before delta_0000002_0000002_0000
+    // There should be 2 original bucket files (000000_0 and 000001_0), a base 
directory,
+    // plus two new delta directories
+    Assert.assertEquals(5, status.length);
+    int numDelta = 0;
+    sawNewBase = false;
+    for (int i = 0; i < status.length; i++) {
+      if (status[i].getPath().getName().matches("delta_.*")) {
+        numDelta++;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), 
FileUtils.STAGING_DIR_PATH_FILTER);
+        Arrays.sort(buckets);
+        if (numDelta == 1) {
+          Assert.assertEquals("delta_0000001_0000001_0000", 
status[i].getPath().getName());
+          Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+        } else if (numDelta == 2) {
+          Assert.assertEquals("delta_0000002_0000002_0000", 
status[i].getPath().getName());
+          Assert.assertEquals(BUCKET_COUNT, buckets.length);
+          Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
+          Assert.assertEquals("bucket_00001", buckets[1].getPath().getName());
+        }
+      } else if (status[i].getPath().getName().matches("base_.*")) {
+        Assert.assertEquals("base_-9223372036854775808", 
status[i].getPath().getName());
+        sawNewBase = true;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), 
FileUtils.STAGING_DIR_PATH_FILTER);
+        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+        Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+      } else {
+        
Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+      }
+    }
+    Assert.assertEquals(2, numDelta);
+    Assert.assertTrue(sawNewBase);
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 3}, {3, 4}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 2;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 5. Perform another major compaction
+    runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 
'MAJOR'");
+    runWorker(hiveConf);
+    // There should be 1 new base directory: base_0000001
+    // Original bucket files, delta directories and the previous base 
directory should stay until Cleaner kicks in.
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    Arrays.sort(status);
+    Assert.assertEquals(6, status.length);
+    int numBase = 0;
+    for (int i = 0; i < status.length; i++) {
+      if (status[i].getPath().getName().matches("base_.*")) {
+        numBase++;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), 
FileUtils.STAGING_DIR_PATH_FILTER);
+        Arrays.sort(buckets);
+        if (numBase == 1) {
+          Assert.assertEquals("base_-9223372036854775808", 
status[i].getPath().getName());
+          Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+        } else if (numBase == 2) {
+          // The new base dir now has two bucket files, since the delta dir 
has two bucket files
+          Assert.assertEquals("base_0000002", status[i].getPath().getName());
+          Assert.assertEquals(BUCKET_COUNT, buckets.length);
+          Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
+          Assert.assertEquals("bucket_00001", buckets[1].getPath().getName());
+        }
+      }
+    }
+    Assert.assertEquals(2, numBase);
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 3}, {3, 4}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 2;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 6. Let Cleaner delete obsolete files/dirs
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    // Before Cleaner, there should be 6 items:
+    // 2 original files, 2 delta directories and 2 base directories
+    Assert.assertEquals(6, status.length);
+    runCleaner(hiveConf);
+    // There should be only 1 directory left: base_0000001.
+    // Original bucket files, delta directories and previous base directory 
should have been cleaned up.
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(1, status.length);
+    Assert.assertEquals("base_0000002", status[0].getPath().getName());
+    FileStatus[] buckets = fs.listStatus(status[0].getPath(), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    Arrays.sort(buckets);
+    Assert.assertEquals(BUCKET_COUNT, buckets.length);
+    Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
+    Assert.assertEquals("bucket_00001", buckets[1].getPath().getName());
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 3}, {3, 4}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 2;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+  }
+
   @Test
   public void testValidTxnsBookkeeping() throws Exception {
     // 1. Run a query against a non-ACID table, and we shouldn't have txn 
logged in conf

Reply via email to