Repository: hive Updated Branches: refs/heads/master 53e01e4a5 -> d43938ca1
HIVE-13961 : ACID: Major compaction fails to include the original bucket files if there's no delta directory (Wei Zheng, reviewed by Eugene Koifman) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d43938ca Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d43938ca Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d43938ca Branch: refs/heads/master Commit: d43938ca127a849e50e8eaddd6313d409cea6770 Parents: 53e01e4 Author: Wei Zheng <w...@apache.org> Authored: Wed Jun 15 10:19:30 2016 -0700 Committer: Wei Zheng <w...@apache.org> Committed: Wed Jun 15 10:19:30 2016 -0700 ---------------------------------------------------------------------- .../hive/ql/txn/compactor/CompactorMR.java | 7 +- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 308 ++++++++++++++++++- 2 files changed, 310 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d43938ca/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 71e69d5..6caca98 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -254,10 +254,9 @@ public class CompactorMR { } } - if (parsedDeltas.size() == 0) { - // Seriously, no deltas? Can't compact that. - LOG.error( "No delta files found to compact in " + sd.getLocation()); - //couldn't someone want to run a Major compaction to convert old table to ACID? + if (parsedDeltas.size() == 0 && dir.getOriginalFiles() == null) { + // Skip compaction if there's no delta files AND there's no original files + LOG.error("No delta files or original files found to compact in " + sd.getLocation()); return; } http://git-wip-us.apache.org/repos/asf/hive/blob/d43938ca/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index a1bd0fb..e76c925 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -270,10 +270,15 @@ public class TestTxnCommands2 { /** * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction + * 1. Insert a row to Non-ACID table + * 2. Convert Non-ACID to ACID table + * 3. Insert a row to ACID table + * 4. Perform Major compaction + * 5. Clean * @throws Exception */ @Test - public void testNonAcidToAcidConversionAndMajorCompaction() throws Exception { + public void testNonAcidToAcidConversion1() throws Exception { FileSystem fs = FileSystem.get(hiveConf); FileStatus[] status; @@ -394,6 +399,307 @@ public class TestTxnCommands2 { Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); } + /** + * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction + * 1. Insert a row to Non-ACID table + * 2. Convert Non-ACID to ACID table + * 3. Update the existing row in ACID table + * 4. Perform Major compaction + * 5. Clean + * @throws Exception + */ + @Test + public void testNonAcidToAcidConversion2() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert a row to Non-ACID table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 original bucket files in the location (000000_0 and 000001_0) + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + int [][] resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + int resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 2. Convert NONACIDORCTBL to ACID table + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Everything should be same as before + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 3. Update the existing row in newly-converted ACID table + runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 original bucket files (000000_0 and 000001_0), plus a new delta directory. + // The delta directory should also have 2 bucket files (bucket_00000 and bucket_00001) + Assert.assertEquals(3, status.length); + boolean sawNewDelta = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("delta_.*")) { + sawNewDelta = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + } else { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + } + Assert.assertTrue(sawNewDelta); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 4. Perform a major compaction + runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + // There should be 1 new directory: base_0000001. + // Original bucket files and delta directory should stay until Cleaner kicks in. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(4, status.length); + boolean sawNewBase = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("base_.*")) { + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); + } + } + Assert.assertTrue(sawNewBase); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 5. Let Cleaner delete obsolete files/dirs + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Before Cleaner, there should be 4 items: + // 2 original files, 1 delta directory and 1 base directory + Assert.assertEquals(4, status.length); + runCleaner(hiveConf); + // There should be only 1 directory left: base_0000001. + // Original bucket files and delta directory should have been cleaned up. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); + FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + } + + /** + * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction + * 1. Insert a row to Non-ACID table + * 2. Convert Non-ACID to ACID table + * 3. Perform Major compaction + * 4. Insert a new row to ACID table + * 5. Perform another Major compaction + * 6. Clean + * @throws Exception + */ + @Test + public void testNonAcidToAcidConversion3() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert a row to Non-ACID table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 original bucket files in the location (000000_0 and 000001_0) + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + int [][] resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + int resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 2. Convert NONACIDORCTBL to ACID table + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Everything should be same as before + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 3. Perform a major compaction + runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + // There should be 1 new directory: base_-9223372036854775808 + // Original bucket files should stay until Cleaner kicks in. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(3, status.length); + boolean sawNewBase = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("base_.*")) { + Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName()); + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + } + } + Assert.assertTrue(sawNewBase); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 4. Update the existing row, and insert another row to newly-converted ACID table + runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1"); + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Arrays.sort(status); // make sure delta_0000001_0000001_0000 appears before delta_0000002_0000002_0000 + // There should be 2 original bucket files (000000_0 and 000001_0), a base directory, + // plus two new delta directories + Assert.assertEquals(5, status.length); + int numDelta = 0; + sawNewBase = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("delta_.*")) { + numDelta++; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Arrays.sort(buckets); + if (numDelta == 1) { + Assert.assertEquals("delta_0000001_0000001_0000", status[i].getPath().getName()); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + } else if (numDelta == 2) { + Assert.assertEquals("delta_0000002_0000002_0000", status[i].getPath().getName()); + Assert.assertEquals(BUCKET_COUNT, buckets.length); + Assert.assertEquals("bucket_00000", buckets[0].getPath().getName()); + Assert.assertEquals("bucket_00001", buckets[1].getPath().getName()); + } + } else if (status[i].getPath().getName().matches("base_.*")) { + Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName()); + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + } else { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + } + Assert.assertEquals(2, numDelta); + Assert.assertTrue(sawNewBase); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 5. Perform another major compaction + runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + // There should be 1 new base directory: base_0000001 + // Original bucket files, delta directories and the previous base directory should stay until Cleaner kicks in. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Arrays.sort(status); + Assert.assertEquals(6, status.length); + int numBase = 0; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("base_.*")) { + numBase++; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Arrays.sort(buckets); + if (numBase == 1) { + Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName()); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + } else if (numBase == 2) { + // The new base dir now has two bucket files, since the delta dir has two bucket files + Assert.assertEquals("base_0000002", status[i].getPath().getName()); + Assert.assertEquals(BUCKET_COUNT, buckets.length); + Assert.assertEquals("bucket_00000", buckets[0].getPath().getName()); + Assert.assertEquals("bucket_00001", buckets[1].getPath().getName()); + } + } + } + Assert.assertEquals(2, numBase); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 6. Let Cleaner delete obsolete files/dirs + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Before Cleaner, there should be 6 items: + // 2 original files, 2 delta directories and 2 base directories + Assert.assertEquals(6, status.length); + runCleaner(hiveConf); + // There should be only 1 directory left: base_0000001. + // Original bucket files, delta directories and previous base directory should have been cleaned up. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertEquals("base_0000002", status[0].getPath().getName()); + FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Arrays.sort(buckets); + Assert.assertEquals(BUCKET_COUNT, buckets.length); + Assert.assertEquals("bucket_00000", buckets[0].getPath().getName()); + Assert.assertEquals("bucket_00001", buckets[1].getPath().getName()); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + } + @Test public void testValidTxnsBookkeeping() throws Exception { // 1. Run a query against a non-ACID table, and we shouldn't have txn logged in conf