HIVE-14035 Enable predicate pushdown to delta files created by ACID Transactions (Saket Saurabh via Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ecab0d07 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ecab0d07 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ecab0d07 Branch: refs/heads/master Commit: ecab0d072b50a8d85dca6e850e47425d96c1ac09 Parents: 333fa87 Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Fri Aug 12 10:31:39 2016 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Fri Aug 12 10:31:39 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 57 +- .../mapreduce/FosterStorageHandler.java | 3 + .../streaming/AbstractRecordWriter.java | 7 + .../hive/ql/txn/compactor/TestCompactor.java | 318 +++++++++- metastore/if/hive_metastore.thrift | 1 + .../thrift/gen-cpp/hive_metastore_constants.cpp | 2 + .../thrift/gen-cpp/hive_metastore_constants.h | 1 + .../metastore/api/hive_metastoreConstants.java | 2 + .../src/gen/thrift/gen-php/metastore/Types.php | 5 + .../thrift/gen-py/hive_metastore/constants.py | 1 + .../thrift/gen-rb/hive_metastore_constants.rb | 2 + .../TransactionalValidationListener.java | 126 +++- .../apache/hadoop/hive/ql/exec/FetchTask.java | 1 + .../hadoop/hive/ql/exec/SMBMapJoinOperator.java | 1 + .../hadoop/hive/ql/exec/mr/MapredLocalTask.java | 1 + .../hadoop/hive/ql/io/AcidOutputFormat.java | 25 +- .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 456 +++++++++++++-- .../hadoop/hive/ql/io/HiveInputFormat.java | 1 + .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 248 ++++++-- .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 152 ++++- .../apache/hadoop/hive/ql/metadata/Hive.java | 95 +-- .../hadoop/hive/ql/plan/TableScanDesc.java | 9 + .../hive/ql/txn/compactor/CompactorMR.java | 99 +++- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 106 +++- .../ql/TestTxnCommands2WithSplitUpdate.java | 584 +++++++++++++++++++ .../apache/hadoop/hive/ql/io/TestAcidUtils.java | 275 ++++++++- .../hive/ql/io/orc/TestInputOutputFormat.java | 225 +++++-- 27 files changed, 2515 insertions(+), 288 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 3e9f6ec..0abb788 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -18,7 +18,32 @@ package org.apache.hadoop.hive.conf; -import com.google.common.base.Joiner; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import java.net.URI; +import java.net.URL; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.security.auth.login.LoginException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -42,27 +67,7 @@ import org.apache.hive.common.HiveCompat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.security.auth.login.LoginException; - -import java.io.*; -import java.net.URI; -import java.net.URL; -import java.net.URLDecoder; -import java.net.URLEncoder; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import com.google.common.base.Joiner; /** * Hive Configuration. @@ -261,6 +266,7 @@ public class HiveConf extends Configuration { HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, HiveConf.ConfVars.HIVE_TXN_MANAGER, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, + HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES, HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE, HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH, HiveConf.ConfVars.HIVE_TXN_RETRYABLE_SQLEX_REGEX, @@ -1762,6 +1768,13 @@ public class HiveConf extends Configuration { " of the lock manager is dumped to log file. This is for debugging. See also " + "hive.lock.numretries and hive.lock.sleep.between.retries."), + HIVE_TXN_OPERATIONAL_PROPERTIES("hive.txn.operational.properties", 0, + "Sets the operational properties that control the appropriate behavior for various\n" + + "versions of the Hive ACID subsystem. Setting it to zero will turn on the legacy mode\n" + + "for ACID, while setting it to one will enable a split-update feature found in the newer\n" + + "version of Hive ACID subsystem. Mostly it is intended to be used as an internal property\n" + + "for future versions of ACID. (See HIVE-14035 for details.)"), + HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" + "current open transactions reach this limit, future open transaction requests will be \n" + "rejected, until this number goes below the limit."), http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java ---------------------------------------------------------------------- diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java index 14f7316..b970153 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java @@ -133,6 +133,9 @@ public class FosterStorageHandler extends DefaultStorageHandler { boolean isAcidTable = AcidUtils.isTablePropertyTransactional(tableProperties); AcidUtils.setTransactionalTableScan(jobProperties, isAcidTable); + AcidUtils.AcidOperationalProperties acidOperationalProperties = + AcidUtils.getAcidOperationalProperties(tableProperties); + AcidUtils.setAcidOperationalProperties(jobProperties, acidOperationalProperties); } } catch (IOException e) { throw new IllegalStateException("Failed to set output path", e); http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java index 974c6b8..b8615cb 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java @@ -48,6 +48,7 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Properties; public abstract class AbstractRecordWriter implements RecordWriter { @@ -265,10 +266,16 @@ public abstract class AbstractRecordWriter implements RecordWriter { private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxTxnID) throws IOException, SerializationError { try { + // Initialize table properties from the table parameters. This is required because the table + // may define certain table parameters that may be required while writing. The table parameter + // 'transactional_properties' is one such example. + Properties tblProperties = new Properties(); + tblProperties.putAll(tbl.getParameters()); return outf.getRecordUpdater(partitionPath, new AcidOutputFormat.Options(conf) .inspector(getSerde().getObjectInspector()) .bucket(bucketId) + .tableProperties(tblProperties) .minimumTransactionId(minTxnId) .maximumTransactionId(maxTxnID) .statementId(-1) http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 731caa8..f81752f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -17,6 +17,19 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -68,20 +81,6 @@ import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.TimeUnit; - /** */ public class TestCompactor { @@ -857,6 +856,297 @@ public class TestCompactor { } } + @Test + public void majorCompactWhileStreamingForSplitUpdate() throws Exception { + String dbName = "default"; + String tblName = "cws"; + List<String> colNames = Arrays.asList("a", "b"); + String columnNamesProperty = "a,b"; + String columnTypesProperty = "int:string"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true', " + + "'transactional_properties'='default') ", driver); // this turns on split-update U=D+I + + HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); + DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); + try { + // Write a couple of batches + for (int i = 0; i < 2; i++) { + writeBatch(connection, writer, false); + } + + // Start a third batch, but don't close it. + writeBatch(connection, writer, true); + + // Now, compact + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR)); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(conf); + AtomicBoolean stop = new AtomicBoolean(true); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + + // Find the location of the table + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = + fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter); + if (1 != stat.length) { + Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat)); + } + String name = stat[0].getPath().getName(); + Assert.assertEquals(name, "base_0000004"); + checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L); + } finally { + connection.close(); + } + } + + @Test + public void testMinorCompactionForSplitUpdateWithInsertsAndDeletes() throws Exception { + String agentInfo = "UT_" + Thread.currentThread().getName(); + String dbName = "default"; + String tblName = "cws"; + List<String> colNames = Arrays.asList("a", "b"); + String columnNamesProperty = "a,b"; + String columnTypesProperty = "int:string"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true'," + + "'transactional_properties'='default')", driver); + + // Insert some data -> this will generate only insert deltas and no delete deltas: delta_1_1 + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver); + + // Insert some data -> this will again generate only insert deltas and no delete deltas: delta_2_2 + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver); + + // Delete some data -> this will generate only delete deltas and no insert deltas: delete_delta_3_3 + executeStatementOnDriver("DELETE FROM " + tblName +" WHERE a = 2", driver); + + // Now, compact -> Compaction produces a single range for both delta and delete delta + // That is, both delta and delete_deltas would be compacted into delta_1_3 and delete_delta_1_3 + // even though there are only two delta_1_1, delta_2_2 and one delete_delta_3_3. + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR)); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(conf); + AtomicBoolean stop = new AtomicBoolean(true); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + + // Find the location of the table + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + + // Verify that we have got correct set of deltas. + FileStatus[] stat = + fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter); + String[] deltas = new String[stat.length]; + Path minorCompactedDelta = null; + for (int i = 0; i < deltas.length; i++) { + deltas[i] = stat[i].getPath().getName(); + if (deltas[i].equals("delta_0000001_0000003")) { + minorCompactedDelta = stat[i].getPath(); + } + } + Arrays.sort(deltas); + String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000003", "delta_0000002_0000002_0000"}; + if (!Arrays.deepEquals(expectedDeltas, deltas)) { + Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas)); + } + checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 2L); + + // Verify that we have got correct set of delete_deltas. + FileStatus[] deleteDeltaStat = + fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deleteEventDeltaDirFilter); + String[] deleteDeltas = new String[deleteDeltaStat.length]; + Path minorCompactedDeleteDelta = null; + for (int i = 0; i < deleteDeltas.length; i++) { + deleteDeltas[i] = deleteDeltaStat[i].getPath().getName(); + if (deleteDeltas[i].equals("delete_delta_0000001_0000003")) { + minorCompactedDeleteDelta = deleteDeltaStat[i].getPath(); + } + } + Arrays.sort(deleteDeltas); + String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000003", "delete_delta_0000003_0000003_0000"}; + if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { + Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); + } + checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 2L, 2L); + } + + @Test + public void testMinorCompactionForSplitUpdateWithOnlyInserts() throws Exception { + String agentInfo = "UT_" + Thread.currentThread().getName(); + String dbName = "default"; + String tblName = "cws"; + List<String> colNames = Arrays.asList("a", "b"); + String columnNamesProperty = "a,b"; + String columnTypesProperty = "int:string"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true'," + + "'transactional_properties'='default')", driver); + + // Insert some data -> this will generate only insert deltas and no delete deltas: delta_1_1 + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver); + + // Insert some data -> this will again generate only insert deltas and no delete deltas: delta_2_2 + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver); + + // Now, compact + // One important thing to note in this test is that minor compaction always produces + // delta_x_y and a counterpart delete_delta_x_y, even when there are no delete_delta events. + // Such a choice has been made to simplify processing of AcidUtils.getAcidState(). + + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR)); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(conf); + AtomicBoolean stop = new AtomicBoolean(true); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + + // Find the location of the table + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + + // Verify that we have got correct set of deltas. + FileStatus[] stat = + fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter); + String[] deltas = new String[stat.length]; + Path minorCompactedDelta = null; + for (int i = 0; i < deltas.length; i++) { + deltas[i] = stat[i].getPath().getName(); + if (deltas[i].equals("delta_0000001_0000002")) { + minorCompactedDelta = stat[i].getPath(); + } + } + Arrays.sort(deltas); + String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000002", "delta_0000002_0000002_0000"}; + if (!Arrays.deepEquals(expectedDeltas, deltas)) { + Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas)); + } + checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 2L); + + // Verify that we have got correct set of delete_deltas. + FileStatus[] deleteDeltaStat = + fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deleteEventDeltaDirFilter); + String[] deleteDeltas = new String[deleteDeltaStat.length]; + Path minorCompactedDeleteDelta = null; + for (int i = 0; i < deleteDeltas.length; i++) { + deleteDeltas[i] = deleteDeltaStat[i].getPath().getName(); + if (deleteDeltas[i].equals("delete_delta_0000001_0000002")) { + minorCompactedDeleteDelta = deleteDeltaStat[i].getPath(); + } + } + Arrays.sort(deleteDeltas); + String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000002"}; + if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { + Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); + } + // There should be no rows in the delete_delta because there have been no delete events. + checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L); + } + + @Test + public void minorCompactWhileStreamingWithSplitUpdate() throws Exception { + String dbName = "default"; + String tblName = "cws"; + List<String> colNames = Arrays.asList("a", "b"); + String columnNamesProperty = "a,b"; + String columnTypesProperty = "int:string"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true'," + + "'transactional_properties'='default')", driver); + + HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); + DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); + try { + // Write a couple of batches + for (int i = 0; i < 2; i++) { + writeBatch(connection, writer, false); + } + + // Start a third batch, but don't close it. + writeBatch(connection, writer, true); + + // Now, compact + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR)); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(conf); + AtomicBoolean stop = new AtomicBoolean(true); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + + // Find the location of the table + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = + fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter); + String[] names = new String[stat.length]; + Path resultFile = null; + for (int i = 0; i < names.length; i++) { + names[i] = stat[i].getPath().getName(); + if (names[i].equals("delta_0000001_0000004")) { + resultFile = stat[i].getPath(); + } + } + Arrays.sort(names); + String[] expected = new String[]{"delta_0000001_0000002", + "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"}; + if (!Arrays.deepEquals(expected, names)) { + Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names)); + } + checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 1L, 4L); + + // Verify that we have got correct set of delete_deltas also + FileStatus[] deleteDeltaStat = + fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deleteEventDeltaDirFilter); + String[] deleteDeltas = new String[deleteDeltaStat.length]; + Path minorCompactedDeleteDelta = null; + for (int i = 0; i < deleteDeltas.length; i++) { + deleteDeltas[i] = deleteDeltaStat[i].getPath().getName(); + if (deleteDeltas[i].equals("delete_delta_0000001_0000004")) { + minorCompactedDeleteDelta = deleteDeltaStat[i].getPath(); + } + } + Arrays.sort(deleteDeltas); + String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000004"}; + if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { + Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); + } + // There should be no rows in the delete_delta because there have been no delete events. + checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L); + + } finally { + connection.close(); + } + } + /** * Users have the choice of specifying compaction related tblproperties either in CREATE TABLE * statement or in ALTER TABLE .. COMPACT statement. This tests both cases. http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/if/hive_metastore.thrift ---------------------------------------------------------------------- diff --git a/metastore/if/hive_metastore.thrift b/metastore/if/hive_metastore.thrift index a2e35b8..872c0f3 100755 --- a/metastore/if/hive_metastore.thrift +++ b/metastore/if/hive_metastore.thrift @@ -1475,5 +1475,6 @@ const string FILE_OUTPUT_FORMAT = "file.outputformat", const string META_TABLE_STORAGE = "storage_handler", const string TABLE_IS_TRANSACTIONAL = "transactional", const string TABLE_NO_AUTO_COMPACT = "no_auto_compaction", +const string TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties", http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp b/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp index f982bf2..1cbd176 100644 --- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp +++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp @@ -57,6 +57,8 @@ hive_metastoreConstants::hive_metastoreConstants() { TABLE_NO_AUTO_COMPACT = "no_auto_compaction"; + TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties"; + } }}} // namespace http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h b/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h index ae14bd1..3d068c3 100644 --- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h +++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h @@ -38,6 +38,7 @@ class hive_metastoreConstants { std::string META_TABLE_STORAGE; std::string TABLE_IS_TRANSACTIONAL; std::string TABLE_NO_AUTO_COMPACT; + std::string TABLE_TRANSACTIONAL_PROPERTIES; }; extern const hive_metastoreConstants g_hive_metastore_constants; http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java index 5a666f2..8de8896 100644 --- a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java +++ b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java @@ -82,4 +82,6 @@ public class hive_metastoreConstants { public static final String TABLE_NO_AUTO_COMPACT = "no_auto_compaction"; + public static final String TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties"; + } http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/gen/thrift/gen-php/metastore/Types.php ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-php/metastore/Types.php b/metastore/src/gen/thrift/gen-php/metastore/Types.php index d6f7f49..2f9cc9b 100644 --- a/metastore/src/gen/thrift/gen-php/metastore/Types.php +++ b/metastore/src/gen/thrift/gen-php/metastore/Types.php @@ -18865,6 +18865,7 @@ final class Constant extends \Thrift\Type\TConstant { static protected $META_TABLE_STORAGE; static protected $TABLE_IS_TRANSACTIONAL; static protected $TABLE_NO_AUTO_COMPACT; + static protected $TABLE_TRANSACTIONAL_PROPERTIES; static protected function init_DDL_TIME() { return "transient_lastDdlTime"; @@ -18957,6 +18958,10 @@ final class Constant extends \Thrift\Type\TConstant { static protected function init_TABLE_NO_AUTO_COMPACT() { return "no_auto_compaction"; } + + static protected function init_TABLE_TRANSACTIONAL_PROPERTIES() { + return "transactional_properties"; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/gen/thrift/gen-py/hive_metastore/constants.py ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-py/hive_metastore/constants.py b/metastore/src/gen/thrift/gen-py/hive_metastore/constants.py index d1c07a5..5100236 100644 --- a/metastore/src/gen/thrift/gen-py/hive_metastore/constants.py +++ b/metastore/src/gen/thrift/gen-py/hive_metastore/constants.py @@ -32,3 +32,4 @@ FILE_OUTPUT_FORMAT = "file.outputformat" META_TABLE_STORAGE = "storage_handler" TABLE_IS_TRANSACTIONAL = "transactional" TABLE_NO_AUTO_COMPACT = "no_auto_compaction" +TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties" http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb b/metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb index eeccc84..6aa7143 100644 --- a/metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb +++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb @@ -53,3 +53,5 @@ TABLE_IS_TRANSACTIONAL = %q"transactional" TABLE_NO_AUTO_COMPACT = %q"no_auto_compaction" +TABLE_TRANSACTIONAL_PROPERTIES = %q"transactional_properties" + http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java index 3e74675..0f08f43 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java @@ -17,25 +17,35 @@ */ package org.apache.hadoop.hive.metastore; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.events.PreAlterTableEvent; import org.apache.hadoop.hive.metastore.events.PreCreateTableEvent; import org.apache.hadoop.hive.metastore.events.PreEventContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -final class TransactionalValidationListener extends MetaStorePreEventListener { +public final class TransactionalValidationListener extends MetaStorePreEventListener { public static final Logger LOG = LoggerFactory.getLogger(TransactionalValidationListener.class); + // These constants are also imported by org.apache.hadoop.hive.ql.io.AcidUtils. + public static final String DEFAULT_TRANSACTIONAL_PROPERTY = "default"; + public static final String LEGACY_TRANSACTIONAL_PROPERTY = "legacy"; + TransactionalValidationListener(Configuration conf) { super(conf); } + @Override public void onEvent(PreEventContext context) throws MetaException, NoSuchObjectException, InvalidOperationException { switch (context.getEventType()) { @@ -60,6 +70,8 @@ final class TransactionalValidationListener extends MetaStorePreEventListener { /** * once a table is marked transactional, you cannot go back. Enforce this. + * Also in current version, 'transactional_properties' of the table cannot be altered after + * the table is created. Any attempt to alter it will throw a MetaException. */ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throws MetaException { Table newTable = context.getNewTable(); @@ -70,12 +82,22 @@ final class TransactionalValidationListener extends MetaStorePreEventListener { Set<String> keys = new HashSet<>(parameters.keySet()); String transactionalValue = null; boolean transactionalValuePresent = false; + boolean isTransactionalPropertiesPresent = false; + String transactionalPropertiesValue = null; + boolean hasValidTransactionalValue = false; + for (String key : keys) { if(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.equalsIgnoreCase(key)) { transactionalValuePresent = true; transactionalValue = parameters.get(key); parameters.remove(key); } + if(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) { + isTransactionalPropertiesPresent = true; + transactionalPropertiesValue = parameters.get(key); + // Do not remove the parameter yet, because we have separate initialization routine + // that will use it down below. + } } if (transactionalValuePresent) { //normalize prop name @@ -91,24 +113,52 @@ final class TransactionalValidationListener extends MetaStorePreEventListener { throw new MetaException(newTable.getDbName() + "." + newTable.getTableName() + " cannot be declared transactional because it's an external table"); } - - return; + hasValidTransactionalValue = true; } + Table oldTable = context.getOldTable(); String oldTransactionalValue = null; + String oldTransactionalPropertiesValue = null; for (String key : oldTable.getParameters().keySet()) { if (hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.equalsIgnoreCase(key)) { oldTransactionalValue = oldTable.getParameters().get(key); } + if (hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) { + oldTransactionalPropertiesValue = oldTable.getParameters().get(key); + } } + + if (oldTransactionalValue == null ? transactionalValue == null : oldTransactionalValue.equalsIgnoreCase(transactionalValue)) { //this covers backward compat cases where this prop may have been set already - return; + hasValidTransactionalValue = true; + } + + if (!hasValidTransactionalValue) { + // if here, there is attempt to set transactional to something other than 'true' + // and NOT the same value it was before + throw new MetaException("TBLPROPERTIES with 'transactional'='true' cannot be unset"); + } + + if (isTransactionalPropertiesPresent) { + // Now validate transactional_properties for the table. + if (oldTransactionalValue == null) { + // If this is the first time the table is being initialized to 'transactional=true', + // any valid value can be set for the 'transactional_properties'. + initializeTransactionalProperties(newTable); + } else { + // If the table was already marked as 'transactional=true', then the new value of + // 'transactional_properties' must match the old value. Any attempt to alter the previous + // value will throw an error. An exception will still be thrown if the previous value was + // null and an attempt is made to set it. This behaviour can be changed in the future. + if (oldTransactionalPropertiesValue == null + || !oldTransactionalPropertiesValue.equalsIgnoreCase(transactionalPropertiesValue) ) { + throw new MetaException("TBLPROPERTIES with 'transactional_properties' cannot be " + + "altered after the table is created"); + } + } } - // if here, there is attempt to set transactional to something other than 'true' - // and NOT the same value it was before - throw new MetaException("TBLPROPERTIES with 'transactional'='true' cannot be unset"); } /** @@ -157,6 +207,7 @@ final class TransactionalValidationListener extends MetaStorePreEventListener { // normalize prop name parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.TRUE.toString()); + initializeTransactionalProperties(newTable); return; } @@ -187,4 +238,53 @@ final class TransactionalValidationListener extends MetaStorePreEventListener { return true; } -} \ No newline at end of file + + private void initializeTransactionalProperties(Table table) throws MetaException { + // All new versions of Acid tables created after the introduction of Acid version/type system + // can have TRANSACTIONAL_PROPERTIES property defined. This parameter can be used to change + // the operational behavior of ACID. However if this parameter is not defined, the new Acid + // tables will still behave as the old ones. This is done so to preserve the behavior + // in case of rolling downgrade. + + // Initialize transaction table properties with default string value. + String tableTransactionalProperties = null; + + Map<String, String> parameters = table.getParameters(); + if (parameters != null) { + Set<String> keys = parameters.keySet(); + for (String key : keys) { + if (hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) { + tableTransactionalProperties = parameters.get(key).toLowerCase(); + parameters.remove(key); + String validationError = validateTransactionalProperties(tableTransactionalProperties); + if (validationError != null) { + throw new MetaException("Invalid transactional properties specified for the " + + "table with the error " + validationError); + } + break; + } + } + } + + if (tableTransactionalProperties != null) { + parameters.put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, + tableTransactionalProperties); + } + } + + private String validateTransactionalProperties(String transactionalProperties) { + boolean isValid = false; + switch (transactionalProperties) { + case DEFAULT_TRANSACTIONAL_PROPERTY: + case LEGACY_TRANSACTIONAL_PROPERTY: + isValid = true; + break; + default: + isValid = false; + } + if (!isValid) { + return "unknown value " + transactionalProperties + " for transactional_properties"; + } + return null; // All checks passed, return null. + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index db6848a..8c7d99d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -81,6 +81,7 @@ public class FetchTask extends Task<FetchWork> implements Serializable { HiveInputFormat.pushFilters(job, ts); AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable()); + AcidUtils.setAcidOperationalProperties(job, ts.getConf().getAcidOperationalProperties()); } sink = work.getSink(); fetch = new FetchOperator(work, job, source, getVirtualColumns(source)); http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 57b6c67..584eff4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -212,6 +212,7 @@ public class SMBMapJoinOperator extends AbstractMapJoinOperator<SMBJoinDesc> imp HiveInputFormat.pushFilters(jobClone, ts); AcidUtils.setTransactionalTableScan(jobClone, ts.getConf().isAcidTable()); + AcidUtils.setAcidOperationalProperties(jobClone, ts.getConf().getAcidOperationalProperties()); ts.passExecContext(getExecContext()); http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index 26e6443..ac922ce 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -472,6 +472,7 @@ public class MapredLocalTask extends Task<MapredLocalWork> implements Serializab HiveInputFormat.pushFilters(jobClone, ts); AcidUtils.setTransactionalTableScan(jobClone, ts.getConf().isAcidTable()); + AcidUtils.setAcidOperationalProperties(jobClone, ts.getConf().getAcidOperationalProperties()); // create a fetch operator FetchOperator fetchOp = new FetchOperator(entry.getValue(), jobClone); http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java index dd90a95..b85b827 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java @@ -18,6 +18,10 @@ package org.apache.hadoop.hive.ql.io; +import java.io.IOException; +import java.io.PrintStream; +import java.util.Properties; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -26,10 +30,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.Reporter; -import java.io.IOException; -import java.io.PrintStream; -import java.util.Properties; - /** * An extension for OutputFormats that want to implement ACID transactions. * @param <V> the row type of the file @@ -44,6 +44,7 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO private FileSystem fs; private ObjectInspector inspector; private boolean writingBase = false; + private boolean writingDeleteDelta = false; private boolean isCompressed = false; private Properties properties; private Reporter reporter; @@ -98,6 +99,16 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO } /** + * Is this writing a delete delta directory? + * @param val is this a delete delta file? + * @return this + */ + public Options writingDeleteDelta(boolean val) { + this.writingDeleteDelta = val; + return this; + } + + /** * Provide a file system to the writer. Otherwise, the filesystem for the * path will be used. * @param fs the file system that corresponds to the the path @@ -223,7 +234,7 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO this.finalDestination = p; return this; } - + public Configuration getConfiguration() { return configuration; } @@ -260,6 +271,10 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO return writingBase; } + public boolean isWritingDeleteDelta() { + return writingDeleteDelta; + } + public int getBucket() { return bucket; } http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 449d889..cda5f39 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -18,10 +18,14 @@ package org.apache.hadoop.hive.ql.io; -import org.apache.hadoop.hive.metastore.api.DataOperationType; -import org.apache.hadoop.hive.ql.ErrorMsg; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Pattern; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -30,22 +34,19 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.TransactionalValidationListener; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.shims.HadoopShims; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.regex.Pattern; - /** * Utilities that are shared by all of the ACID input and output formats. They * are used by the compactor and cleaner and thus must be format agnostic. @@ -61,6 +62,7 @@ public class AcidUtils { } }; public static final String DELTA_PREFIX = "delta_"; + public static final String DELETE_DELTA_PREFIX = "delete_delta_"; public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length"; public static final PathFilter deltaFileFilter = new PathFilter() { @Override @@ -68,6 +70,12 @@ public class AcidUtils { return path.getName().startsWith(DELTA_PREFIX); } }; + public static final PathFilter deleteEventDeltaDirFilter = new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith(DELETE_DELTA_PREFIX); + } + }; public static final String BUCKET_PREFIX = "bucket_"; public static final PathFilter bucketFileFilter = new PathFilter() { @Override @@ -142,6 +150,25 @@ public class AcidUtils { return deltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId); } + /** + * This is format of delete delta dir name prior to Hive 2.2.x + */ + @VisibleForTesting + static String deleteDeltaSubdir(long min, long max) { + return DELETE_DELTA_PREFIX + String.format(DELTA_DIGITS, min) + "_" + + String.format(DELTA_DIGITS, max); + } + + /** + * Each write statement in a transaction creates its own delete delta dir, + * when split-update acid operational property is turned on. + * @since 2.2.x + */ + @VisibleForTesting + static String deleteDeltaSubdir(long min, long max, int statementId) { + return deleteDeltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId); + } + public static String baseDir(long txnId) { return BASE_PREFIX + String.format(DELTA_DIGITS, txnId); } @@ -163,12 +190,19 @@ public class AcidUtils { } else if(options.getStatementId() == -1) { //when minor compaction runs, we collapse per statement delta files inside a single //transaction so we no longer need a statementId in the file name - subdir = deltaSubdir(options.getMinimumTransactionId(), - options.getMaximumTransactionId()); + subdir = options.isWritingDeleteDelta() ? + deleteDeltaSubdir(options.getMinimumTransactionId(), + options.getMaximumTransactionId()) + : deltaSubdir(options.getMinimumTransactionId(), + options.getMaximumTransactionId()); } else { - subdir = deltaSubdir(options.getMinimumTransactionId(), - options.getMaximumTransactionId(), - options.getStatementId()); + subdir = options.isWritingDeleteDelta() ? + deleteDeltaSubdir(options.getMinimumTransactionId(), + options.getMaximumTransactionId(), + options.getStatementId()) + : deltaSubdir(options.getMinimumTransactionId(), + options.getMaximumTransactionId(), + options.getStatementId()); } return createBucketFile(new Path(directory, subdir), options.getBucket()); } @@ -195,11 +229,10 @@ public class AcidUtils { * @return the options used to create that filename */ public static AcidOutputFormat.Options - parseBaseBucketFilename(Path bucketFile, - Configuration conf) { + parseBaseOrDeltaBucketFilename(Path bucketFile, + Configuration conf) { AcidOutputFormat.Options result = new AcidOutputFormat.Options(conf); String filename = bucketFile.getName(); - result.writingBase(true); if (ORIGINAL_PATTERN.matcher(filename).matches()) { int bucket = Integer.parseInt(filename.substring(0, filename.indexOf('_'))); @@ -207,15 +240,33 @@ public class AcidUtils { .setOldStyle(true) .minimumTransactionId(0) .maximumTransactionId(0) - .bucket(bucket); + .bucket(bucket) + .writingBase(true); } else if (filename.startsWith(BUCKET_PREFIX)) { int bucket = Integer.parseInt(filename.substring(filename.indexOf('_') + 1)); - result - .setOldStyle(false) - .minimumTransactionId(0) - .maximumTransactionId(parseBase(bucketFile.getParent())) - .bucket(bucket); + if (bucketFile.getParent().getName().startsWith(BASE_PREFIX)) { + result + .setOldStyle(false) + .minimumTransactionId(0) + .maximumTransactionId(parseBase(bucketFile.getParent())) + .bucket(bucket) + .writingBase(true); + } else if (bucketFile.getParent().getName().startsWith(DELTA_PREFIX)) { + ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELTA_PREFIX); + result + .setOldStyle(false) + .minimumTransactionId(parsedDelta.minTransaction) + .maximumTransactionId(parsedDelta.maxTransaction) + .bucket(bucket); + } else if (bucketFile.getParent().getName().startsWith(DELETE_DELTA_PREFIX)) { + ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELETE_DELTA_PREFIX); + result + .setOldStyle(false) + .minimumTransactionId(parsedDelta.minTransaction) + .maximumTransactionId(parsedDelta.maxTransaction) + .bucket(bucket); + } } else { result.setOldStyle(true).bucket(-1).minimumTransactionId(0) .maximumTransactionId(0); @@ -248,6 +299,179 @@ public class AcidUtils { } } + public enum AcidBaseFileType { + COMPACTED_BASE, // a regular base file generated through major compaction + ORIGINAL_BASE, // a non-acid schema file for tables that got converted to acid + INSERT_DELTA; // a delta file with only insert events that can be treated as base for split-update + } + + /** + * A simple wrapper class that stores the information about a base file and its type. + * Orc splits can be generated on three kinds of base files: an original file (non-acid converted + * files), a regular base file (created by major compaction) or an insert delta (which can be + * treated as a base when split-update is enabled for acid). + */ + public static class AcidBaseFileInfo { + final private HdfsFileStatusWithId fileId; + final private AcidBaseFileType acidBaseFileType; + + public AcidBaseFileInfo(HdfsFileStatusWithId fileId, AcidBaseFileType acidBaseFileType) { + this.fileId = fileId; + this.acidBaseFileType = acidBaseFileType; + } + + public boolean isCompactedBase() { + return this.acidBaseFileType == AcidBaseFileType.COMPACTED_BASE; + } + + public boolean isOriginal() { + return this.acidBaseFileType == AcidBaseFileType.ORIGINAL_BASE; + } + + public boolean isInsertDelta() { + return this.acidBaseFileType == AcidBaseFileType.INSERT_DELTA; + } + + public HdfsFileStatusWithId getHdfsFileStatusWithId() { + return this.fileId; + } + } + + public static class AcidOperationalProperties { + private int description = 0x00; + public static final int SPLIT_UPDATE_BIT = 0x01; + public static final String SPLIT_UPDATE_STRING = "split_update"; + public static final int HASH_BASED_MERGE_BIT = 0x02; + public static final String HASH_BASED_MERGE_STRING = "hash_merge"; + public static final String DEFAULT_VALUE_STRING = TransactionalValidationListener.DEFAULT_TRANSACTIONAL_PROPERTY; + public static final String LEGACY_VALUE_STRING = TransactionalValidationListener.LEGACY_TRANSACTIONAL_PROPERTY; + + private AcidOperationalProperties() { + } + + /** + * Returns an acidOperationalProperties object that represents ACID behavior for legacy tables + * that were created before ACID type system using operational properties was put in place. + * @return the acidOperationalProperties object + */ + public static AcidOperationalProperties getLegacy() { + AcidOperationalProperties obj = new AcidOperationalProperties(); + // In legacy mode, none of these properties are turned on. + return obj; + } + + /** + * Returns an acidOperationalProperties object that represents default ACID behavior for tables + * that do no explicitly specify/override the default behavior. + * @return the acidOperationalProperties object. + */ + public static AcidOperationalProperties getDefault() { + AcidOperationalProperties obj = new AcidOperationalProperties(); + obj.setSplitUpdate(true); + obj.setHashBasedMerge(false); + return obj; + } + + /** + * Returns an acidOperationalProperties object that is represented by an encoded string. + * @param propertiesStr an encoded string representing the acidOperationalProperties. + * @return the acidOperationalProperties object. + */ + public static AcidOperationalProperties parseString(String propertiesStr) { + if (propertiesStr == null) { + return AcidOperationalProperties.getLegacy(); + } + if (propertiesStr.equalsIgnoreCase(DEFAULT_VALUE_STRING)) { + return AcidOperationalProperties.getDefault(); + } + if (propertiesStr.equalsIgnoreCase(LEGACY_VALUE_STRING)) { + return AcidOperationalProperties.getLegacy(); + } + AcidOperationalProperties obj = new AcidOperationalProperties(); + String[] options = propertiesStr.split("\\|"); + for (String option : options) { + if (option.trim().length() == 0) continue; // ignore empty strings + switch (option) { + case SPLIT_UPDATE_STRING: + obj.setSplitUpdate(true); + break; + case HASH_BASED_MERGE_STRING: + obj.setHashBasedMerge(true); + break; + default: + throw new IllegalArgumentException( + "Unexpected value " + option + " for ACID operational properties!"); + } + } + return obj; + } + + /** + * Returns an acidOperationalProperties object that is represented by an encoded 32-bit integer. + * @param properties an encoded 32-bit representing the acidOperationalProperties. + * @return the acidOperationalProperties object. + */ + public static AcidOperationalProperties parseInt(int properties) { + AcidOperationalProperties obj = new AcidOperationalProperties(); + if ((properties & SPLIT_UPDATE_BIT) > 0) { + obj.setSplitUpdate(true); + } + if ((properties & HASH_BASED_MERGE_BIT) > 0) { + obj.setHashBasedMerge(true); + } + return obj; + } + + /** + * Sets the split update property for ACID operations based on the boolean argument. + * When split update is turned on, an update ACID event is interpreted as a combination of + * delete event followed by an update event. + * @param isSplitUpdate a boolean property that turns on split update when true. + * @return the acidOperationalProperties object. + */ + public AcidOperationalProperties setSplitUpdate(boolean isSplitUpdate) { + description = (isSplitUpdate + ? (description | SPLIT_UPDATE_BIT) : (description & ~SPLIT_UPDATE_BIT)); + return this; + } + + /** + * Sets the hash-based merge property for ACID operations that combines delta files using + * GRACE hash join based approach, when turned on. (Currently unimplemented!) + * @param isHashBasedMerge a boolean property that turns on hash-based merge when true. + * @return the acidOperationalProperties object. + */ + public AcidOperationalProperties setHashBasedMerge(boolean isHashBasedMerge) { + description = (isHashBasedMerge + ? (description | HASH_BASED_MERGE_BIT) : (description & ~HASH_BASED_MERGE_BIT)); + return this; + } + + public boolean isSplitUpdate() { + return (description & SPLIT_UPDATE_BIT) > 0; + } + + public boolean isHashBasedMerge() { + return (description & HASH_BASED_MERGE_BIT) > 0; + } + + public int toInt() { + return description; + } + + @Override + public String toString() { + StringBuilder str = new StringBuilder(); + if (isSplitUpdate()) { + str.append("|" + SPLIT_UPDATE_STRING); + } + if (isHashBasedMerge()) { + str.append("|" + HASH_BASED_MERGE_STRING); + } + return str.toString(); + } + } + public static interface Directory { /** @@ -287,18 +511,20 @@ public class AcidUtils { //-1 is for internal (getAcidState()) purposes and means the delta dir //had no statement ID private final int statementId; + private final boolean isDeleteDelta; // records whether delta dir is of type 'delete_delta_x_y...' /** * for pre 1.3.x delta files */ - ParsedDelta(long min, long max, FileStatus path) { - this(min, max, path, -1); + ParsedDelta(long min, long max, FileStatus path, boolean isDeleteDelta) { + this(min, max, path, -1, isDeleteDelta); } - ParsedDelta(long min, long max, FileStatus path, int statementId) { + ParsedDelta(long min, long max, FileStatus path, int statementId, boolean isDeleteDelta) { this.minTransaction = min; this.maxTransaction = max; this.path = path; this.statementId = statementId; + this.isDeleteDelta = isDeleteDelta; } public long getMinTransaction() { @@ -317,6 +543,10 @@ public class AcidUtils { return statementId == -1 ? 0 : statementId; } + public boolean isDeleteDelta() { + return isDeleteDelta; + } + /** * Compactions (Major/Minor) merge deltas/bases but delete of old files * happens in a different process; thus it's possible to have bases/deltas with @@ -418,15 +648,49 @@ public class AcidUtils { return results.toArray(new Path[results.size()]); } - private static ParsedDelta parseDelta(FileStatus path) { - ParsedDelta p = parsedDelta(path.getPath()); - return new ParsedDelta(p.getMinTransaction(), - p.getMaxTransaction(), path, p.statementId); + /** + * Convert the list of begin/end transaction id pairs to a list of delete delta + * directories. Note that there may be multiple delete_delta files for the exact same txn range starting + * with 2.2.x; + * see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)} + * @param root the root directory + * @param deleteDeltas list of begin/end transaction id pairs + * @return the list of delta paths + */ + public static Path[] deserializeDeleteDeltas(Path root, final List<AcidInputFormat.DeltaMetaData> deleteDeltas) throws IOException { + List<Path> results = new ArrayList<Path>(deleteDeltas.size()); + for(AcidInputFormat.DeltaMetaData dmd : deleteDeltas) { + if(dmd.getStmtIds().isEmpty()) { + results.add(new Path(root, deleteDeltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId()))); + continue; + } + for(Integer stmtId : dmd.getStmtIds()) { + results.add(new Path(root, deleteDeltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId(), stmtId))); + } + } + return results.toArray(new Path[results.size()]); } + public static ParsedDelta parsedDelta(Path deltaDir) { + String deltaDirName = deltaDir.getName(); + if (deltaDirName.startsWith(DELETE_DELTA_PREFIX)) { + return parsedDelta(deltaDir, DELETE_DELTA_PREFIX); + } + return parsedDelta(deltaDir, DELTA_PREFIX); // default prefix is delta_prefix + } + + private static ParsedDelta parseDelta(FileStatus path, String deltaPrefix) { + ParsedDelta p = parsedDelta(path.getPath(), deltaPrefix); + boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX); + return new ParsedDelta(p.getMinTransaction(), + p.getMaxTransaction(), path, p.statementId, isDeleteDelta); + } + + public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix) { String filename = deltaDir.getName(); - if (filename.startsWith(DELTA_PREFIX)) { - String rest = filename.substring(DELTA_PREFIX.length()); + boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX); + if (filename.startsWith(deltaPrefix)) { + String rest = filename.substring(deltaPrefix.length()); int split = rest.indexOf('_'); int split2 = rest.indexOf('_', split + 1);//may be -1 if no statementId long min = Long.parseLong(rest.substring(0, split)); @@ -434,13 +698,13 @@ public class AcidUtils { Long.parseLong(rest.substring(split + 1)) : Long.parseLong(rest.substring(split + 1, split2)); if(split2 == -1) { - return new ParsedDelta(min, max, null); + return new ParsedDelta(min, max, null, isDeleteDelta); } int statementId = Integer.parseInt(rest.substring(split2 + 1)); - return new ParsedDelta(min, max, null, statementId); + return new ParsedDelta(min, max, null, statementId, isDeleteDelta); } throw new IllegalArgumentException(deltaDir + " does not start with " + - DELTA_PREFIX); + deltaPrefix); } /** @@ -456,7 +720,8 @@ public class AcidUtils { for(FileStatus file: fs.listStatus(directory)) { String filename = file.getPath().getName(); if (filename.startsWith(BASE_PREFIX) || - filename.startsWith(DELTA_PREFIX)) { + filename.startsWith(DELTA_PREFIX) || + filename.startsWith(DELETE_DELTA_PREFIX)) { if (file.isDir()) { return true; } @@ -499,6 +764,7 @@ public class AcidUtils { boolean ignoreEmptyFiles ) throws IOException { FileSystem fs = directory.getFileSystem(conf); + // The following 'deltas' includes all kinds of delta files including insert & delete deltas. final List<ParsedDelta> deltas = new ArrayList<ParsedDelta>(); List<ParsedDelta> working = new ArrayList<ParsedDelta>(); List<FileStatus> originalDirectories = new ArrayList<FileStatus>(); @@ -553,6 +819,7 @@ public class AcidUtils { //subject to list of 'exceptions' in 'txnList' (not show in above example). long current = bestBase.txn; int lastStmtId = -1; + ParsedDelta prev = null; for(ParsedDelta next: working) { if (next.maxTransaction > current) { // are any of the new transactions ones that we care about? @@ -561,6 +828,7 @@ public class AcidUtils { deltas.add(next); current = next.maxTransaction; lastStmtId = next.statementId; + prev = next; } } else if(next.maxTransaction == current && lastStmtId >= 0) { @@ -568,6 +836,24 @@ public class AcidUtils { //generate multiple delta files with the same txnId range //of course, if maxTransaction has already been minor compacted, all per statement deltas are obsolete deltas.add(next); + prev = next; + } + else if (prev != null && next.maxTransaction == prev.maxTransaction + && next.minTransaction == prev.minTransaction + && next.statementId == prev.statementId) { + // The 'next' parsedDelta may have everything equal to the 'prev' parsedDelta, except + // the path. This may happen when we have split update and we have two types of delta + // directories- 'delta_x_y' and 'delete_delta_x_y' for the SAME txn range. + + // Also note that any delete_deltas in between a given delta_x_y range would be made + // obsolete. For example, a delta_30_50 would make delete_delta_40_40 obsolete. + // This is valid because minor compaction always compacts the normal deltas and the delete + // deltas for the same range. That is, if we had 3 directories, delta_30_30, + // delete_delta_40_40 and delta_50_50, then running minor compaction would produce + // delta_30_50 and delete_delta_30_50. + + deltas.add(next); + prev = next; } else { obsolete.add(next.path); @@ -638,7 +924,8 @@ public class AcidUtils { } private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId, ValidTxnList txnList, List<ParsedDelta> working, List<FileStatus> originalDirectories, - List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase bestBase, boolean ignoreEmptyFiles) { + List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase bestBase, + boolean ignoreEmptyFiles) throws IOException { Path p = child.getPath(); String fn = p.getName(); if (fn.startsWith(BASE_PREFIX) && child.isDir()) { @@ -662,8 +949,11 @@ public class AcidUtils { } else { obsolete.add(child); } - } else if (fn.startsWith(DELTA_PREFIX) && child.isDir()) { - ParsedDelta delta = parseDelta(child); + } else if ((fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX)) + && child.isDir()) { + String deltaPrefix = + (fn.startsWith(DELTA_PREFIX)) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; + ParsedDelta delta = parseDelta(child, deltaPrefix); if (txnList.isTxnRangeValid(delta.minTransaction, delta.maxTransaction) != ValidTxnList.RangeResponse.NONE) { @@ -791,4 +1081,84 @@ public class AcidUtils { return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); } + + /** + * Sets the acidOperationalProperties in the configuration object argument. + * @param conf Mutable configuration object + * @param properties An acidOperationalProperties object to initialize from. + */ + public static void setAcidOperationalProperties(Configuration conf, + AcidOperationalProperties properties) { + if (properties != null) { + HiveConf.setIntVar(conf, ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES, properties.toInt()); + } + } + + /** + * Sets the acidOperationalProperties in the map object argument. + * @param parameters Mutable map object + * @param properties An acidOperationalProperties object to initialize from. + */ + public static void setAcidOperationalProperties( + Map<String, String> parameters, AcidOperationalProperties properties) { + if (properties != null) { + parameters.put(ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, properties.toString()); + } + } + + /** + * Returns the acidOperationalProperties for a given table. + * @param table A table object + * @return the acidOperationalProperties object for the corresponding table. + */ + public static AcidOperationalProperties getAcidOperationalProperties(Table table) { + String transactionalProperties = table.getProperty( + hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); + if (transactionalProperties == null) { + // If the table does not define any transactional properties, we return a legacy type. + return AcidOperationalProperties.getLegacy(); + } + return AcidOperationalProperties.parseString(transactionalProperties); + } + + /** + * Returns the acidOperationalProperties for a given configuration. + * @param conf A configuration object + * @return the acidOperationalProperties object for the corresponding configuration. + */ + public static AcidOperationalProperties getAcidOperationalProperties(Configuration conf) { + // If the conf does not define any transactional properties, the parseInt() should receive + // a value of zero, which will set AcidOperationalProperties to a legacy type and return that. + return AcidOperationalProperties.parseInt( + HiveConf.getIntVar(conf, ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES)); + } + + /** + * Returns the acidOperationalProperties for a given set of properties. + * @param props A properties object + * @return the acidOperationalProperties object for the corresponding properties. + */ + public static AcidOperationalProperties getAcidOperationalProperties(Properties props) { + String resultStr = props.getProperty(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); + if (resultStr == null) { + // If the properties does not define any transactional properties, we return a legacy type. + return AcidOperationalProperties.getLegacy(); + } + return AcidOperationalProperties.parseString(resultStr); + } + + /** + * Returns the acidOperationalProperties for a given map. + * @param parameters A parameters object + * @return the acidOperationalProperties object for the corresponding map. + */ + public static AcidOperationalProperties getAcidOperationalProperties( + Map<String, String> parameters) { + String resultStr = parameters.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); + if (resultStr == null) { + // If the parameters does not define any transactional properties, we return a legacy type. + return AcidOperationalProperties.getLegacy(); + } + return AcidOperationalProperties.parseString(resultStr); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 945b828..c4b9940 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -637,6 +637,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> pushFilters(jobConf, ts); AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable()); + AcidUtils.setAcidOperationalProperties(job, ts.getConf().getAcidOperationalProperties()); } } }