HIVE-14035 Enable predicate pushdown to delta files created by ACID 
Transactions (Saket Saurabh via Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ecab0d07
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ecab0d07
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ecab0d07

Branch: refs/heads/master
Commit: ecab0d072b50a8d85dca6e850e47425d96c1ac09
Parents: 333fa87
Author: Eugene Koifman <ekoif...@hortonworks.com>
Authored: Fri Aug 12 10:31:39 2016 -0700
Committer: Eugene Koifman <ekoif...@hortonworks.com>
Committed: Fri Aug 12 10:31:39 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  57 +-
 .../mapreduce/FosterStorageHandler.java         |   3 +
 .../streaming/AbstractRecordWriter.java         |   7 +
 .../hive/ql/txn/compactor/TestCompactor.java    | 318 +++++++++-
 metastore/if/hive_metastore.thrift              |   1 +
 .../thrift/gen-cpp/hive_metastore_constants.cpp |   2 +
 .../thrift/gen-cpp/hive_metastore_constants.h   |   1 +
 .../metastore/api/hive_metastoreConstants.java  |   2 +
 .../src/gen/thrift/gen-php/metastore/Types.php  |   5 +
 .../thrift/gen-py/hive_metastore/constants.py   |   1 +
 .../thrift/gen-rb/hive_metastore_constants.rb   |   2 +
 .../TransactionalValidationListener.java        | 126 +++-
 .../apache/hadoop/hive/ql/exec/FetchTask.java   |   1 +
 .../hadoop/hive/ql/exec/SMBMapJoinOperator.java |   1 +
 .../hadoop/hive/ql/exec/mr/MapredLocalTask.java |   1 +
 .../hadoop/hive/ql/io/AcidOutputFormat.java     |  25 +-
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 456 +++++++++++++--
 .../hadoop/hive/ql/io/HiveInputFormat.java      |   1 +
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   | 248 ++++++--
 .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 152 ++++-
 .../apache/hadoop/hive/ql/metadata/Hive.java    |  95 +--
 .../hadoop/hive/ql/plan/TableScanDesc.java      |   9 +
 .../hive/ql/txn/compactor/CompactorMR.java      |  99 +++-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java | 106 +++-
 .../ql/TestTxnCommands2WithSplitUpdate.java     | 584 +++++++++++++++++++
 .../apache/hadoop/hive/ql/io/TestAcidUtils.java | 275 ++++++++-
 .../hive/ql/io/orc/TestInputOutputFormat.java   | 225 +++++--
 27 files changed, 2515 insertions(+), 288 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 3e9f6ec..0abb788 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -18,7 +18,32 @@
 
 package org.apache.hadoop.hive.conf;
 
-import com.google.common.base.Joiner;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.security.auth.login.LoginException;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -42,27 +67,7 @@ import org.apache.hive.common.HiveCompat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.security.auth.login.LoginException;
-
-import java.io.*;
-import java.net.URI;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import com.google.common.base.Joiner;
 
 /**
  * Hive Configuration.
@@ -261,6 +266,7 @@ public class HiveConf extends Configuration {
       HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
       HiveConf.ConfVars.HIVE_TXN_MANAGER,
       HiveConf.ConfVars.HIVE_TXN_TIMEOUT,
+      HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES,
       HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE,
       HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH,
       HiveConf.ConfVars.HIVE_TXN_RETRYABLE_SQLEX_REGEX,
@@ -1762,6 +1768,13 @@ public class HiveConf extends Configuration {
         " of the lock manager is dumped to log file.  This is for debugging.  
See also " +
         "hive.lock.numretries and hive.lock.sleep.between.retries."),
 
+    HIVE_TXN_OPERATIONAL_PROPERTIES("hive.txn.operational.properties", 0,
+        "Sets the operational properties that control the appropriate behavior 
for various\n"
+        + "versions of the Hive ACID subsystem. Setting it to zero will turn 
on the legacy mode\n"
+        + "for ACID, while setting it to one will enable a split-update 
feature found in the newer\n"
+        + "version of Hive ACID subsystem. Mostly it is intended to be used as 
an internal property\n"
+        + "for future versions of ACID. (See HIVE-14035 for details.)"),
+
     HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open 
transactions. If \n" +
         "current open transactions reach this limit, future open transaction 
requests will be \n" +
         "rejected, until this number goes below the limit."),

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
index 14f7316..b970153 100644
--- 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
+++ 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
@@ -133,6 +133,9 @@ public class FosterStorageHandler extends 
DefaultStorageHandler {
 
         boolean isAcidTable = 
AcidUtils.isTablePropertyTransactional(tableProperties);
         AcidUtils.setTransactionalTableScan(jobProperties, isAcidTable);
+        AcidUtils.AcidOperationalProperties acidOperationalProperties =
+                AcidUtils.getAcidOperationalProperties(tableProperties);
+        AcidUtils.setAcidOperationalProperties(jobProperties, 
acidOperationalProperties);
       }
     } catch (IOException e) {
       throw new IllegalStateException("Failed to set output path", e);

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
index 974c6b8..b8615cb 100644
--- 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
@@ -48,6 +48,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Properties;
 
 
 public abstract class AbstractRecordWriter implements RecordWriter {
@@ -265,10 +266,16 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
   private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long 
maxTxnID)
           throws IOException, SerializationError {
     try {
+      // Initialize table properties from the table parameters. This is 
required because the table
+      // may define certain table parameters that may be required while 
writing. The table parameter
+      // 'transactional_properties' is one such example.
+      Properties tblProperties = new Properties();
+      tblProperties.putAll(tbl.getParameters());
       return  outf.getRecordUpdater(partitionPath,
               new AcidOutputFormat.Options(conf)
                       .inspector(getSerde().getObjectInspector())
                       .bucket(bucketId)
+                      .tableProperties(tblProperties)
                       .minimumTransactionId(minTxnId)
                       .maximumTransactionId(maxTxnID)
                       .statementId(-1)

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 731caa8..f81752f 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -17,6 +17,19 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -68,20 +81,6 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.TimeUnit;
-
 /**
  */
 public class TestCompactor {
@@ -857,6 +856,297 @@ public class TestCompactor {
     }
   }
 
+  @Test
+  public void majorCompactWhileStreamingForSplitUpdate() throws Exception {
+    String dbName = "default";
+    String tblName = "cws";
+    List<String> colNames = Arrays.asList("a", "b");
+    String columnNamesProperty = "a,b";
+    String columnTypesProperty = "int:string";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+        " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to 
be bucketed
+        " STORED AS ORC  TBLPROPERTIES ('transactional'='true', "
+        + "'transactional_properties'='default') ", driver); // this turns on 
split-update U=D+I
+
+    HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
+    DelimitedInputWriter writer = new DelimitedInputWriter(new String[] 
{"a","b"},",", endPt);
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + 
Thread.currentThread().getName());
+    try {
+      // Write a couple of batches
+      for (int i = 0; i < 2; i++) {
+        writeBatch(connection, writer, false);
+      }
+
+      // Start a third batch, but don't close it.
+      writeBatch(connection, writer, true);
+
+      // Now, compact
+      TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+      txnHandler.compact(new CompactionRequest(dbName, tblName, 
CompactionType.MAJOR));
+      Worker t = new Worker();
+      t.setThreadId((int) t.getId());
+      t.setHiveConf(conf);
+      AtomicBoolean stop = new AtomicBoolean(true);
+      AtomicBoolean looped = new AtomicBoolean();
+      t.init(stop, looped);
+      t.run();
+
+      // Find the location of the table
+      IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+      Table table = msClient.getTable(dbName, tblName);
+      FileSystem fs = FileSystem.get(conf);
+      FileStatus[] stat =
+          fs.listStatus(new Path(table.getSd().getLocation()), 
AcidUtils.baseFileFilter);
+      if (1 != stat.length) {
+        Assert.fail("Expecting 1 file \"base_0000004\" and found " + 
stat.length + " files " + Arrays.toString(stat));
+      }
+      String name = stat[0].getPath().getName();
+      Assert.assertEquals(name, "base_0000004");
+      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, 
columnTypesProperty, 0, 1L, 4L);
+    } finally {
+      connection.close();
+    }
+  }
+
+  @Test
+  public void testMinorCompactionForSplitUpdateWithInsertsAndDeletes() throws 
Exception {
+    String agentInfo = "UT_" + Thread.currentThread().getName();
+    String dbName = "default";
+    String tblName = "cws";
+    List<String> colNames = Arrays.asList("a", "b");
+    String columnNamesProperty = "a,b";
+    String columnTypesProperty = "int:string";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+        " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to 
be bucketed
+        " STORED AS ORC  TBLPROPERTIES ('transactional'='true',"
+        + "'transactional_properties'='default')", driver);
+
+    // Insert some data -> this will generate only insert deltas and no delete 
deltas: delta_1_1
+    executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 
'foo')", driver);
+
+    // Insert some data -> this will again generate only insert deltas and no 
delete deltas: delta_2_2
+    executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 
'bar')", driver);
+
+    // Delete some data -> this will generate only delete deltas and no insert 
deltas: delete_delta_3_3
+    executeStatementOnDriver("DELETE FROM " + tblName +" WHERE a = 2", driver);
+
+    // Now, compact -> Compaction produces a single range for both delta and 
delete delta
+    // That is, both delta and delete_deltas would be compacted into delta_1_3 
and delete_delta_1_3
+    // even though there are only two delta_1_1, delta_2_2 and one 
delete_delta_3_3.
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    txnHandler.compact(new CompactionRequest(dbName, tblName, 
CompactionType.MINOR));
+    Worker t = new Worker();
+    t.setThreadId((int) t.getId());
+    t.setHiveConf(conf);
+    AtomicBoolean stop = new AtomicBoolean(true);
+    AtomicBoolean looped = new AtomicBoolean();
+    t.init(stop, looped);
+    t.run();
+
+    // Find the location of the table
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    Table table = msClient.getTable(dbName, tblName);
+    FileSystem fs = FileSystem.get(conf);
+
+    // Verify that we have got correct set of deltas.
+    FileStatus[] stat =
+        fs.listStatus(new Path(table.getSd().getLocation()), 
AcidUtils.deltaFileFilter);
+    String[] deltas = new String[stat.length];
+    Path minorCompactedDelta = null;
+    for (int i = 0; i < deltas.length; i++) {
+      deltas[i] = stat[i].getPath().getName();
+      if (deltas[i].equals("delta_0000001_0000003")) {
+        minorCompactedDelta = stat[i].getPath();
+      }
+    }
+    Arrays.sort(deltas);
+    String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", 
"delta_0000001_0000003", "delta_0000002_0000002_0000"};
+    if (!Arrays.deepEquals(expectedDeltas, deltas)) {
+      Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " 
+ Arrays.toString(deltas));
+    }
+    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, 
columnNamesProperty, columnTypesProperty, 0, 1L, 2L);
+
+    // Verify that we have got correct set of delete_deltas.
+    FileStatus[] deleteDeltaStat =
+        fs.listStatus(new Path(table.getSd().getLocation()), 
AcidUtils.deleteEventDeltaDirFilter);
+    String[] deleteDeltas = new String[deleteDeltaStat.length];
+    Path minorCompactedDeleteDelta = null;
+    for (int i = 0; i < deleteDeltas.length; i++) {
+      deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
+      if (deleteDeltas[i].equals("delete_delta_0000001_0000003")) {
+        minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
+      }
+    }
+    Arrays.sort(deleteDeltas);
+    String[] expectedDeleteDeltas = new 
String[]{"delete_delta_0000001_0000003", "delete_delta_0000003_0000003_0000"};
+    if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
+      Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", 
found: " + Arrays.toString(deleteDeltas));
+    }
+    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, 
columnNamesProperty, columnTypesProperty, 0, 2L, 2L);
+  }
+
+  @Test
+  public void testMinorCompactionForSplitUpdateWithOnlyInserts() throws 
Exception {
+    String agentInfo = "UT_" + Thread.currentThread().getName();
+    String dbName = "default";
+    String tblName = "cws";
+    List<String> colNames = Arrays.asList("a", "b");
+    String columnNamesProperty = "a,b";
+    String columnTypesProperty = "int:string";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+        " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to 
be bucketed
+        " STORED AS ORC  TBLPROPERTIES ('transactional'='true',"
+        + "'transactional_properties'='default')", driver);
+
+    // Insert some data -> this will generate only insert deltas and no delete 
deltas: delta_1_1
+    executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 
'foo')", driver);
+
+    // Insert some data -> this will again generate only insert deltas and no 
delete deltas: delta_2_2
+    executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 
'bar')", driver);
+
+    // Now, compact
+    // One important thing to note in this test is that minor compaction 
always produces
+    // delta_x_y and a counterpart delete_delta_x_y, even when there are no 
delete_delta events.
+    // Such a choice has been made to simplify processing of 
AcidUtils.getAcidState().
+
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    txnHandler.compact(new CompactionRequest(dbName, tblName, 
CompactionType.MINOR));
+    Worker t = new Worker();
+    t.setThreadId((int) t.getId());
+    t.setHiveConf(conf);
+    AtomicBoolean stop = new AtomicBoolean(true);
+    AtomicBoolean looped = new AtomicBoolean();
+    t.init(stop, looped);
+    t.run();
+
+    // Find the location of the table
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    Table table = msClient.getTable(dbName, tblName);
+    FileSystem fs = FileSystem.get(conf);
+
+    // Verify that we have got correct set of deltas.
+    FileStatus[] stat =
+        fs.listStatus(new Path(table.getSd().getLocation()), 
AcidUtils.deltaFileFilter);
+    String[] deltas = new String[stat.length];
+    Path minorCompactedDelta = null;
+    for (int i = 0; i < deltas.length; i++) {
+      deltas[i] = stat[i].getPath().getName();
+      if (deltas[i].equals("delta_0000001_0000002")) {
+        minorCompactedDelta = stat[i].getPath();
+      }
+    }
+    Arrays.sort(deltas);
+    String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", 
"delta_0000001_0000002", "delta_0000002_0000002_0000"};
+    if (!Arrays.deepEquals(expectedDeltas, deltas)) {
+      Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " 
+ Arrays.toString(deltas));
+    }
+    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, 
columnNamesProperty, columnTypesProperty, 0, 1L, 2L);
+
+    // Verify that we have got correct set of delete_deltas.
+    FileStatus[] deleteDeltaStat =
+        fs.listStatus(new Path(table.getSd().getLocation()), 
AcidUtils.deleteEventDeltaDirFilter);
+    String[] deleteDeltas = new String[deleteDeltaStat.length];
+    Path minorCompactedDeleteDelta = null;
+    for (int i = 0; i < deleteDeltas.length; i++) {
+      deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
+      if (deleteDeltas[i].equals("delete_delta_0000001_0000002")) {
+        minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
+      }
+    }
+    Arrays.sort(deleteDeltas);
+    String[] expectedDeleteDeltas = new 
String[]{"delete_delta_0000001_0000002"};
+    if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
+      Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", 
found: " + Arrays.toString(deleteDeltas));
+    }
+    // There should be no rows in the delete_delta because there have been no 
delete events.
+    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, 
columnNamesProperty, columnTypesProperty, 0, 0L, 0L);
+  }
+
+  @Test
+  public void minorCompactWhileStreamingWithSplitUpdate() throws Exception {
+    String dbName = "default";
+    String tblName = "cws";
+    List<String> colNames = Arrays.asList("a", "b");
+    String columnNamesProperty = "a,b";
+    String columnTypesProperty = "int:string";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+        " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to 
be bucketed
+        " STORED AS ORC  TBLPROPERTIES ('transactional'='true',"
+        + "'transactional_properties'='default')", driver);
+
+    HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
+    DelimitedInputWriter writer = new DelimitedInputWriter(new String[] 
{"a","b"},",", endPt);
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + 
Thread.currentThread().getName());
+    try {
+      // Write a couple of batches
+      for (int i = 0; i < 2; i++) {
+        writeBatch(connection, writer, false);
+      }
+
+      // Start a third batch, but don't close it.
+      writeBatch(connection, writer, true);
+
+      // Now, compact
+      TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+      txnHandler.compact(new CompactionRequest(dbName, tblName, 
CompactionType.MINOR));
+      Worker t = new Worker();
+      t.setThreadId((int) t.getId());
+      t.setHiveConf(conf);
+      AtomicBoolean stop = new AtomicBoolean(true);
+      AtomicBoolean looped = new AtomicBoolean();
+      t.init(stop, looped);
+      t.run();
+
+      // Find the location of the table
+      IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+      Table table = msClient.getTable(dbName, tblName);
+      FileSystem fs = FileSystem.get(conf);
+      FileStatus[] stat =
+          fs.listStatus(new Path(table.getSd().getLocation()), 
AcidUtils.deltaFileFilter);
+      String[] names = new String[stat.length];
+      Path resultFile = null;
+      for (int i = 0; i < names.length; i++) {
+        names[i] = stat[i].getPath().getName();
+        if (names[i].equals("delta_0000001_0000004")) {
+          resultFile = stat[i].getPath();
+        }
+      }
+      Arrays.sort(names);
+      String[] expected = new String[]{"delta_0000001_0000002",
+          "delta_0000001_0000004", "delta_0000003_0000004", 
"delta_0000005_0000006"};
+      if (!Arrays.deepEquals(expected, names)) {
+        Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + 
Arrays.toString(names));
+      }
+      checkExpectedTxnsPresent(null, new 
Path[]{resultFile},columnNamesProperty, columnTypesProperty,  0, 1L, 4L);
+
+      // Verify that we have got correct set of delete_deltas also
+      FileStatus[] deleteDeltaStat =
+          fs.listStatus(new Path(table.getSd().getLocation()), 
AcidUtils.deleteEventDeltaDirFilter);
+      String[] deleteDeltas = new String[deleteDeltaStat.length];
+      Path minorCompactedDeleteDelta = null;
+      for (int i = 0; i < deleteDeltas.length; i++) {
+        deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
+        if (deleteDeltas[i].equals("delete_delta_0000001_0000004")) {
+          minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
+        }
+      }
+      Arrays.sort(deleteDeltas);
+      String[] expectedDeleteDeltas = new 
String[]{"delete_delta_0000001_0000004"};
+      if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
+        Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", 
found: " + Arrays.toString(deleteDeltas));
+      }
+      // There should be no rows in the delete_delta because there have been 
no delete events.
+      checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, 
columnNamesProperty, columnTypesProperty, 0, 0L, 0L);
+
+    } finally {
+      connection.close();
+    }
+  }
+
   /**
    * Users have the choice of specifying compaction related tblproperties 
either in CREATE TABLE
    * statement or in ALTER TABLE .. COMPACT statement. This tests both cases.

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/if/hive_metastore.thrift
----------------------------------------------------------------------
diff --git a/metastore/if/hive_metastore.thrift 
b/metastore/if/hive_metastore.thrift
index a2e35b8..872c0f3 100755
--- a/metastore/if/hive_metastore.thrift
+++ b/metastore/if/hive_metastore.thrift
@@ -1475,5 +1475,6 @@ const string FILE_OUTPUT_FORMAT   = "file.outputformat",
 const string META_TABLE_STORAGE   = "storage_handler",
 const string TABLE_IS_TRANSACTIONAL = "transactional",
 const string TABLE_NO_AUTO_COMPACT = "no_auto_compaction",
+const string TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties",
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp 
b/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp
index f982bf2..1cbd176 100644
--- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp
+++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp
@@ -57,6 +57,8 @@ hive_metastoreConstants::hive_metastoreConstants() {
 
   TABLE_NO_AUTO_COMPACT = "no_auto_compaction";
 
+  TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties";
+
 }
 
 }}} // namespace

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h 
b/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h
index ae14bd1..3d068c3 100644
--- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h
+++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h
@@ -38,6 +38,7 @@ class hive_metastoreConstants {
   std::string META_TABLE_STORAGE;
   std::string TABLE_IS_TRANSACTIONAL;
   std::string TABLE_NO_AUTO_COMPACT;
+  std::string TABLE_TRANSACTIONAL_PROPERTIES;
 };
 
 extern const hive_metastoreConstants g_hive_metastore_constants;

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
 
b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
index 5a666f2..8de8896 100644
--- 
a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
+++ 
b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
@@ -82,4 +82,6 @@ public class hive_metastoreConstants {
 
   public static final String TABLE_NO_AUTO_COMPACT = "no_auto_compaction";
 
+  public static final String TABLE_TRANSACTIONAL_PROPERTIES = 
"transactional_properties";
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/gen/thrift/gen-php/metastore/Types.php
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-php/metastore/Types.php 
b/metastore/src/gen/thrift/gen-php/metastore/Types.php
index d6f7f49..2f9cc9b 100644
--- a/metastore/src/gen/thrift/gen-php/metastore/Types.php
+++ b/metastore/src/gen/thrift/gen-php/metastore/Types.php
@@ -18865,6 +18865,7 @@ final class Constant extends \Thrift\Type\TConstant {
   static protected $META_TABLE_STORAGE;
   static protected $TABLE_IS_TRANSACTIONAL;
   static protected $TABLE_NO_AUTO_COMPACT;
+  static protected $TABLE_TRANSACTIONAL_PROPERTIES;
 
   static protected function init_DDL_TIME() {
     return "transient_lastDdlTime";
@@ -18957,6 +18958,10 @@ final class Constant extends \Thrift\Type\TConstant {
   static protected function init_TABLE_NO_AUTO_COMPACT() {
     return "no_auto_compaction";
   }
+
+  static protected function init_TABLE_TRANSACTIONAL_PROPERTIES() {
+    return "transactional_properties";
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/gen/thrift/gen-py/hive_metastore/constants.py
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-py/hive_metastore/constants.py 
b/metastore/src/gen/thrift/gen-py/hive_metastore/constants.py
index d1c07a5..5100236 100644
--- a/metastore/src/gen/thrift/gen-py/hive_metastore/constants.py
+++ b/metastore/src/gen/thrift/gen-py/hive_metastore/constants.py
@@ -32,3 +32,4 @@ FILE_OUTPUT_FORMAT = "file.outputformat"
 META_TABLE_STORAGE = "storage_handler"
 TABLE_IS_TRANSACTIONAL = "transactional"
 TABLE_NO_AUTO_COMPACT = "no_auto_compaction"
+TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties"

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb 
b/metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb
index eeccc84..6aa7143 100644
--- a/metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb
+++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb
@@ -53,3 +53,5 @@ TABLE_IS_TRANSACTIONAL = %q"transactional"
 
 TABLE_NO_AUTO_COMPACT = %q"no_auto_compaction"
 
+TABLE_TRANSACTIONAL_PROPERTIES = %q"transactional_properties"
+

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
 
b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
index 3e74675..0f08f43 100644
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
@@ -17,25 +17,35 @@
  */
 package org.apache.hadoop.hive.metastore;
 
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.events.PreAlterTableEvent;
 import org.apache.hadoop.hive.metastore.events.PreCreateTableEvent;
 import org.apache.hadoop.hive.metastore.events.PreEventContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-final class TransactionalValidationListener extends MetaStorePreEventListener {
+public final class TransactionalValidationListener extends 
MetaStorePreEventListener {
   public static final Logger LOG = 
LoggerFactory.getLogger(TransactionalValidationListener.class);
 
+  // These constants are also imported by 
org.apache.hadoop.hive.ql.io.AcidUtils.
+  public static final String DEFAULT_TRANSACTIONAL_PROPERTY = "default";
+  public static final String LEGACY_TRANSACTIONAL_PROPERTY = "legacy";
+
   TransactionalValidationListener(Configuration conf) {
     super(conf);
   }
 
+  @Override
   public void onEvent(PreEventContext context) throws MetaException, 
NoSuchObjectException,
       InvalidOperationException {
     switch (context.getEventType()) {
@@ -60,6 +70,8 @@ final class TransactionalValidationListener extends 
MetaStorePreEventListener {
 
   /**
    * once a table is marked transactional, you cannot go back.  Enforce this.
+   * Also in current version, 'transactional_properties' of the table cannot 
be altered after
+   * the table is created. Any attempt to alter it will throw a MetaException.
    */
   private void handleAlterTableTransactionalProp(PreAlterTableEvent context) 
throws MetaException {
     Table newTable = context.getNewTable();
@@ -70,12 +82,22 @@ final class TransactionalValidationListener extends 
MetaStorePreEventListener {
     Set<String> keys = new HashSet<>(parameters.keySet());
     String transactionalValue = null;
     boolean transactionalValuePresent = false;
+    boolean isTransactionalPropertiesPresent = false;
+    String transactionalPropertiesValue = null;
+    boolean hasValidTransactionalValue = false;
+
     for (String key : keys) {
       if(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.equalsIgnoreCase(key)) 
{
         transactionalValuePresent = true;
         transactionalValue = parameters.get(key);
         parameters.remove(key);
       }
+      
if(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key))
 {
+        isTransactionalPropertiesPresent = true;
+        transactionalPropertiesValue = parameters.get(key);
+        // Do not remove the parameter yet, because we have separate 
initialization routine
+        // that will use it down below.
+      }
     }
     if (transactionalValuePresent) {
       //normalize prop name
@@ -91,24 +113,52 @@ final class TransactionalValidationListener extends 
MetaStorePreEventListener {
         throw new MetaException(newTable.getDbName() + "." + 
newTable.getTableName() +
             " cannot be declared transactional because it's an external 
table");
       }
-
-      return;
+      hasValidTransactionalValue = true;
     }
+
     Table oldTable = context.getOldTable();
     String oldTransactionalValue = null;
+    String oldTransactionalPropertiesValue = null;
     for (String key : oldTable.getParameters().keySet()) {
       if 
(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.equalsIgnoreCase(key)) {
         oldTransactionalValue = oldTable.getParameters().get(key);
       }
+      if 
(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) {
+        oldTransactionalPropertiesValue = oldTable.getParameters().get(key);
+      }
     }
+
+
     if (oldTransactionalValue == null ? transactionalValue == null
                                      : 
oldTransactionalValue.equalsIgnoreCase(transactionalValue)) {
       //this covers backward compat cases where this prop may have been set 
already
-      return;
+      hasValidTransactionalValue = true;
+    }
+
+    if (!hasValidTransactionalValue) {
+      // if here, there is attempt to set transactional to something other 
than 'true'
+      // and NOT the same value it was before
+      throw new MetaException("TBLPROPERTIES with 'transactional'='true' 
cannot be unset");
+    }
+
+    if (isTransactionalPropertiesPresent) {
+      // Now validate transactional_properties for the table.
+      if (oldTransactionalValue == null) {
+        // If this is the first time the table is being initialized to 
'transactional=true',
+        // any valid value can be set for the 'transactional_properties'.
+        initializeTransactionalProperties(newTable);
+      } else {
+        // If the table was already marked as 'transactional=true', then the 
new value of
+        // 'transactional_properties' must match the old value. Any attempt to 
alter the previous
+        // value will throw an error. An exception will still be thrown if the 
previous value was
+        // null and an attempt is made to set it. This behaviour can be 
changed in the future.
+        if (oldTransactionalPropertiesValue == null
+            || 
!oldTransactionalPropertiesValue.equalsIgnoreCase(transactionalPropertiesValue) 
) {
+          throw new MetaException("TBLPROPERTIES with 
'transactional_properties' cannot be "
+              + "altered after the table is created");
+        }
+      }
     }
-    // if here, there is attempt to set transactional to something other than 
'true'
-    // and NOT the same value it was before
-    throw new MetaException("TBLPROPERTIES with 'transactional'='true' cannot 
be unset");
   }
 
   /**
@@ -157,6 +207,7 @@ final class TransactionalValidationListener extends 
MetaStorePreEventListener {
 
       // normalize prop name
       parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, 
Boolean.TRUE.toString());
+      initializeTransactionalProperties(newTable);
       return;
     }
 
@@ -187,4 +238,53 @@ final class TransactionalValidationListener extends 
MetaStorePreEventListener {
 
     return true;
   }
-}
\ No newline at end of file
+
+  private void initializeTransactionalProperties(Table table) throws 
MetaException {
+    // All new versions of Acid tables created after the introduction of Acid 
version/type system
+    // can have TRANSACTIONAL_PROPERTIES property defined. This parameter can 
be used to change
+    // the operational behavior of ACID. However if this parameter is not 
defined, the new Acid
+    // tables will still behave as the old ones. This is done so to preserve 
the behavior
+    // in case of rolling downgrade.
+
+    // Initialize transaction table properties with default string value.
+    String tableTransactionalProperties = null;
+
+    Map<String, String> parameters = table.getParameters();
+    if (parameters != null) {
+      Set<String> keys = parameters.keySet();
+      for (String key : keys) {
+        if 
(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) {
+          tableTransactionalProperties = parameters.get(key).toLowerCase();
+          parameters.remove(key);
+          String validationError = 
validateTransactionalProperties(tableTransactionalProperties);
+          if (validationError != null) {
+            throw new MetaException("Invalid transactional properties 
specified for the "
+                + "table with the error " + validationError);
+          }
+          break;
+        }
+      }
+    }
+
+    if (tableTransactionalProperties != null) {
+      parameters.put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES,
+              tableTransactionalProperties);
+    }
+  }
+
+  private String validateTransactionalProperties(String 
transactionalProperties) {
+    boolean isValid = false;
+    switch (transactionalProperties) {
+      case DEFAULT_TRANSACTIONAL_PROPERTY:
+      case LEGACY_TRANSACTIONAL_PROPERTY:
+        isValid = true;
+        break;
+      default:
+        isValid = false;
+    }
+    if (!isValid) {
+      return "unknown value " + transactionalProperties +  " for 
transactional_properties";
+    }
+    return null; // All checks passed, return null.
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
index db6848a..8c7d99d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
@@ -81,6 +81,7 @@ public class FetchTask extends Task<FetchWork> implements 
Serializable {
         HiveInputFormat.pushFilters(job, ts);
 
         AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable());
+        AcidUtils.setAcidOperationalProperties(job, 
ts.getConf().getAcidOperationalProperties());
       }
       sink = work.getSink();
       fetch = new FetchOperator(work, job, source, getVirtualColumns(source));

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
index 57b6c67..584eff4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
@@ -212,6 +212,7 @@ public class SMBMapJoinOperator extends 
AbstractMapJoinOperator<SMBJoinDesc> imp
       HiveInputFormat.pushFilters(jobClone, ts);
 
       AcidUtils.setTransactionalTableScan(jobClone, 
ts.getConf().isAcidTable());
+      AcidUtils.setAcidOperationalProperties(jobClone, 
ts.getConf().getAcidOperationalProperties());
 
       ts.passExecContext(getExecContext());
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
index 26e6443..ac922ce 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
@@ -472,6 +472,7 @@ public class MapredLocalTask extends Task<MapredLocalWork> 
implements Serializab
       HiveInputFormat.pushFilters(jobClone, ts);
 
       AcidUtils.setTransactionalTableScan(jobClone, 
ts.getConf().isAcidTable());
+      AcidUtils.setAcidOperationalProperties(jobClone, 
ts.getConf().getAcidOperationalProperties());
 
       // create a fetch operator
       FetchOperator fetchOp = new FetchOperator(entry.getValue(), jobClone);

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
index dd90a95..b85b827 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
@@ -18,6 +18,10 @@
 
 package org.apache.hadoop.hive.ql.io;
 
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Properties;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -26,10 +30,6 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.Reporter;
 
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.Properties;
-
 /**
  * An extension for OutputFormats that want to implement ACID transactions.
  * @param <V> the row type of the file
@@ -44,6 +44,7 @@ public interface AcidOutputFormat<K extends 
WritableComparable, V> extends HiveO
     private FileSystem fs;
     private ObjectInspector inspector;
     private boolean writingBase = false;
+    private boolean writingDeleteDelta = false;
     private boolean isCompressed = false;
     private Properties properties;
     private Reporter reporter;
@@ -98,6 +99,16 @@ public interface AcidOutputFormat<K extends 
WritableComparable, V> extends HiveO
     }
 
     /**
+     * Is this writing a delete delta directory?
+     * @param val is this a delete delta file?
+     * @return this
+     */
+    public Options writingDeleteDelta(boolean val) {
+      this.writingDeleteDelta = val;
+      return this;
+    }
+
+    /**
      * Provide a file system to the writer. Otherwise, the filesystem for the
      * path will be used.
      * @param fs the file system that corresponds to the the path
@@ -223,7 +234,7 @@ public interface AcidOutputFormat<K extends 
WritableComparable, V> extends HiveO
       this.finalDestination = p;
       return this;
     }
-    
+
     public Configuration getConfiguration() {
       return configuration;
     }
@@ -260,6 +271,10 @@ public interface AcidOutputFormat<K extends 
WritableComparable, V> extends HiveO
       return writingBase;
     }
 
+    public boolean isWritingDeleteDelta() {
+      return writingDeleteDelta;
+    }
+
     public int getBucket() {
       return bucket;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 449d889..cda5f39 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -18,10 +18,14 @@
 
 package org.apache.hadoop.hive.ql.io;
 
-import org.apache.hadoop.hive.metastore.api.DataOperationType;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -30,22 +34,19 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.shims.HadoopShims;
-import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.regex.Pattern;
-
 /**
  * Utilities that are shared by all of the ACID input and output formats. They
  * are used by the compactor and cleaner and thus must be format agnostic.
@@ -61,6 +62,7 @@ public class AcidUtils {
     }
   };
   public static final String DELTA_PREFIX = "delta_";
+  public static final String DELETE_DELTA_PREFIX = "delete_delta_";
   public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length";
   public static final PathFilter deltaFileFilter = new PathFilter() {
     @Override
@@ -68,6 +70,12 @@ public class AcidUtils {
       return path.getName().startsWith(DELTA_PREFIX);
     }
   };
+  public static final PathFilter deleteEventDeltaDirFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return path.getName().startsWith(DELETE_DELTA_PREFIX);
+    }
+  };
   public static final String BUCKET_PREFIX = "bucket_";
   public static final PathFilter bucketFileFilter = new PathFilter() {
     @Override
@@ -142,6 +150,25 @@ public class AcidUtils {
     return deltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, 
statementId);
   }
 
+  /**
+   * This is format of delete delta dir name prior to Hive 2.2.x
+   */
+  @VisibleForTesting
+  static String deleteDeltaSubdir(long min, long max) {
+    return DELETE_DELTA_PREFIX + String.format(DELTA_DIGITS, min) + "_" +
+        String.format(DELTA_DIGITS, max);
+  }
+
+  /**
+   * Each write statement in a transaction creates its own delete delta dir,
+   * when split-update acid operational property is turned on.
+   * @since 2.2.x
+   */
+  @VisibleForTesting
+  static String deleteDeltaSubdir(long min, long max, int statementId) {
+    return deleteDeltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, 
statementId);
+  }
+
   public static String baseDir(long txnId) {
     return BASE_PREFIX + String.format(DELTA_DIGITS, txnId);
   }
@@ -163,12 +190,19 @@ public class AcidUtils {
     } else if(options.getStatementId() == -1) {
       //when minor compaction runs, we collapse per statement delta files 
inside a single
       //transaction so we no longer need a statementId in the file name
-      subdir = deltaSubdir(options.getMinimumTransactionId(),
-        options.getMaximumTransactionId());
+      subdir = options.isWritingDeleteDelta() ?
+          deleteDeltaSubdir(options.getMinimumTransactionId(),
+                            options.getMaximumTransactionId())
+          : deltaSubdir(options.getMinimumTransactionId(),
+                        options.getMaximumTransactionId());
     } else {
-      subdir = deltaSubdir(options.getMinimumTransactionId(),
-        options.getMaximumTransactionId(),
-        options.getStatementId());
+      subdir = options.isWritingDeleteDelta() ?
+          deleteDeltaSubdir(options.getMinimumTransactionId(),
+                            options.getMaximumTransactionId(),
+                            options.getStatementId())
+          : deltaSubdir(options.getMinimumTransactionId(),
+                        options.getMaximumTransactionId(),
+                        options.getStatementId());
     }
     return createBucketFile(new Path(directory, subdir), options.getBucket());
   }
@@ -195,11 +229,10 @@ public class AcidUtils {
    * @return the options used to create that filename
    */
   public static AcidOutputFormat.Options
-                    parseBaseBucketFilename(Path bucketFile,
-                                            Configuration conf) {
+                    parseBaseOrDeltaBucketFilename(Path bucketFile,
+                                                   Configuration conf) {
     AcidOutputFormat.Options result = new AcidOutputFormat.Options(conf);
     String filename = bucketFile.getName();
-    result.writingBase(true);
     if (ORIGINAL_PATTERN.matcher(filename).matches()) {
       int bucket =
           Integer.parseInt(filename.substring(0, filename.indexOf('_')));
@@ -207,15 +240,33 @@ public class AcidUtils {
           .setOldStyle(true)
           .minimumTransactionId(0)
           .maximumTransactionId(0)
-          .bucket(bucket);
+          .bucket(bucket)
+          .writingBase(true);
     } else if (filename.startsWith(BUCKET_PREFIX)) {
       int bucket =
           Integer.parseInt(filename.substring(filename.indexOf('_') + 1));
-      result
-          .setOldStyle(false)
-          .minimumTransactionId(0)
-          .maximumTransactionId(parseBase(bucketFile.getParent()))
-          .bucket(bucket);
+      if (bucketFile.getParent().getName().startsWith(BASE_PREFIX)) {
+        result
+            .setOldStyle(false)
+            .minimumTransactionId(0)
+            .maximumTransactionId(parseBase(bucketFile.getParent()))
+            .bucket(bucket)
+            .writingBase(true);
+      } else if (bucketFile.getParent().getName().startsWith(DELTA_PREFIX)) {
+        ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), 
DELTA_PREFIX);
+        result
+            .setOldStyle(false)
+            .minimumTransactionId(parsedDelta.minTransaction)
+            .maximumTransactionId(parsedDelta.maxTransaction)
+            .bucket(bucket);
+      } else if 
(bucketFile.getParent().getName().startsWith(DELETE_DELTA_PREFIX)) {
+        ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), 
DELETE_DELTA_PREFIX);
+        result
+            .setOldStyle(false)
+            .minimumTransactionId(parsedDelta.minTransaction)
+            .maximumTransactionId(parsedDelta.maxTransaction)
+            .bucket(bucket);
+      }
     } else {
       result.setOldStyle(true).bucket(-1).minimumTransactionId(0)
           .maximumTransactionId(0);
@@ -248,6 +299,179 @@ public class AcidUtils {
     }
   }
 
+  public enum AcidBaseFileType {
+    COMPACTED_BASE, // a regular base file generated through major compaction
+    ORIGINAL_BASE, // a non-acid schema file for tables that got converted to 
acid
+    INSERT_DELTA; // a delta file with only insert events that can be treated 
as base for split-update
+  }
+
+  /**
+   * A simple wrapper class that stores the information about a base file and 
its type.
+   * Orc splits can be generated on three kinds of base files: an original 
file (non-acid converted
+   * files), a regular base file (created by major compaction) or an insert 
delta (which can be
+   * treated as a base when split-update is enabled for acid).
+   */
+  public static class AcidBaseFileInfo {
+    final private HdfsFileStatusWithId fileId;
+    final private AcidBaseFileType acidBaseFileType;
+
+    public AcidBaseFileInfo(HdfsFileStatusWithId fileId, AcidBaseFileType 
acidBaseFileType) {
+      this.fileId = fileId;
+      this.acidBaseFileType = acidBaseFileType;
+    }
+
+    public boolean isCompactedBase() {
+      return this.acidBaseFileType == AcidBaseFileType.COMPACTED_BASE;
+    }
+
+    public boolean isOriginal() {
+      return this.acidBaseFileType == AcidBaseFileType.ORIGINAL_BASE;
+    }
+
+    public boolean isInsertDelta() {
+      return this.acidBaseFileType == AcidBaseFileType.INSERT_DELTA;
+    }
+
+    public HdfsFileStatusWithId getHdfsFileStatusWithId() {
+      return this.fileId;
+    }
+  }
+
+  public static class AcidOperationalProperties {
+    private int description = 0x00;
+    public static final int SPLIT_UPDATE_BIT = 0x01;
+    public static final String SPLIT_UPDATE_STRING = "split_update";
+    public static final int HASH_BASED_MERGE_BIT = 0x02;
+    public static final String HASH_BASED_MERGE_STRING = "hash_merge";
+    public static final String DEFAULT_VALUE_STRING = 
TransactionalValidationListener.DEFAULT_TRANSACTIONAL_PROPERTY;
+    public static final String LEGACY_VALUE_STRING = 
TransactionalValidationListener.LEGACY_TRANSACTIONAL_PROPERTY;
+
+    private AcidOperationalProperties() {
+    }
+
+    /**
+     * Returns an acidOperationalProperties object that represents ACID 
behavior for legacy tables
+     * that were created before ACID type system using operational properties 
was put in place.
+     * @return the acidOperationalProperties object
+     */
+    public static AcidOperationalProperties getLegacy() {
+      AcidOperationalProperties obj = new AcidOperationalProperties();
+      // In legacy mode, none of these properties are turned on.
+      return obj;
+    }
+
+    /**
+     * Returns an acidOperationalProperties object that represents default 
ACID behavior for tables
+     * that do no explicitly specify/override the default behavior.
+     * @return the acidOperationalProperties object.
+     */
+    public static AcidOperationalProperties getDefault() {
+      AcidOperationalProperties obj = new AcidOperationalProperties();
+      obj.setSplitUpdate(true);
+      obj.setHashBasedMerge(false);
+      return obj;
+    }
+
+    /**
+     * Returns an acidOperationalProperties object that is represented by an 
encoded string.
+     * @param propertiesStr an encoded string representing the 
acidOperationalProperties.
+     * @return the acidOperationalProperties object.
+     */
+    public static AcidOperationalProperties parseString(String propertiesStr) {
+      if (propertiesStr == null) {
+        return AcidOperationalProperties.getLegacy();
+      }
+      if (propertiesStr.equalsIgnoreCase(DEFAULT_VALUE_STRING)) {
+        return AcidOperationalProperties.getDefault();
+      }
+      if (propertiesStr.equalsIgnoreCase(LEGACY_VALUE_STRING)) {
+        return AcidOperationalProperties.getLegacy();
+      }
+      AcidOperationalProperties obj = new AcidOperationalProperties();
+      String[] options = propertiesStr.split("\\|");
+      for (String option : options) {
+        if (option.trim().length() == 0) continue; // ignore empty strings
+        switch (option) {
+          case SPLIT_UPDATE_STRING:
+            obj.setSplitUpdate(true);
+            break;
+          case HASH_BASED_MERGE_STRING:
+            obj.setHashBasedMerge(true);
+            break;
+          default:
+            throw new IllegalArgumentException(
+                "Unexpected value " + option + " for ACID operational 
properties!");
+        }
+      }
+      return obj;
+    }
+
+    /**
+     * Returns an acidOperationalProperties object that is represented by an 
encoded 32-bit integer.
+     * @param properties an encoded 32-bit representing the 
acidOperationalProperties.
+     * @return the acidOperationalProperties object.
+     */
+    public static AcidOperationalProperties parseInt(int properties) {
+      AcidOperationalProperties obj = new AcidOperationalProperties();
+      if ((properties & SPLIT_UPDATE_BIT)  > 0) {
+        obj.setSplitUpdate(true);
+      }
+      if ((properties & HASH_BASED_MERGE_BIT)  > 0) {
+        obj.setHashBasedMerge(true);
+      }
+      return obj;
+    }
+
+    /**
+     * Sets the split update property for ACID operations based on the boolean 
argument.
+     * When split update is turned on, an update ACID event is interpreted as 
a combination of
+     * delete event followed by an update event.
+     * @param isSplitUpdate a boolean property that turns on split update when 
true.
+     * @return the acidOperationalProperties object.
+     */
+    public AcidOperationalProperties setSplitUpdate(boolean isSplitUpdate) {
+      description = (isSplitUpdate
+              ? (description | SPLIT_UPDATE_BIT) : (description & 
~SPLIT_UPDATE_BIT));
+      return this;
+    }
+
+    /**
+     * Sets the hash-based merge property for ACID operations that combines 
delta files using
+     * GRACE hash join based approach, when turned on. (Currently 
unimplemented!)
+     * @param isHashBasedMerge a boolean property that turns on hash-based 
merge when true.
+     * @return the acidOperationalProperties object.
+     */
+    public AcidOperationalProperties setHashBasedMerge(boolean 
isHashBasedMerge) {
+      description = (isHashBasedMerge
+              ? (description | HASH_BASED_MERGE_BIT) : (description & 
~HASH_BASED_MERGE_BIT));
+      return this;
+    }
+
+    public boolean isSplitUpdate() {
+      return (description & SPLIT_UPDATE_BIT) > 0;
+    }
+
+    public boolean isHashBasedMerge() {
+      return (description & HASH_BASED_MERGE_BIT) > 0;
+    }
+
+    public int toInt() {
+      return description;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder str = new StringBuilder();
+      if (isSplitUpdate()) {
+        str.append("|" + SPLIT_UPDATE_STRING);
+      }
+      if (isHashBasedMerge()) {
+        str.append("|" + HASH_BASED_MERGE_STRING);
+      }
+      return str.toString();
+    }
+  }
+
   public static interface Directory {
 
     /**
@@ -287,18 +511,20 @@ public class AcidUtils {
     //-1 is for internal (getAcidState()) purposes and means the delta dir
     //had no statement ID
     private final int statementId;
+    private final boolean isDeleteDelta; // records whether delta dir is of 
type 'delete_delta_x_y...'
 
     /**
      * for pre 1.3.x delta files
      */
-    ParsedDelta(long min, long max, FileStatus path) {
-      this(min, max, path, -1);
+    ParsedDelta(long min, long max, FileStatus path, boolean isDeleteDelta) {
+      this(min, max, path, -1, isDeleteDelta);
     }
-    ParsedDelta(long min, long max, FileStatus path, int statementId) {
+    ParsedDelta(long min, long max, FileStatus path, int statementId, boolean 
isDeleteDelta) {
       this.minTransaction = min;
       this.maxTransaction = max;
       this.path = path;
       this.statementId = statementId;
+      this.isDeleteDelta = isDeleteDelta;
     }
 
     public long getMinTransaction() {
@@ -317,6 +543,10 @@ public class AcidUtils {
       return statementId == -1 ? 0 : statementId;
     }
 
+    public boolean isDeleteDelta() {
+      return isDeleteDelta;
+    }
+
     /**
      * Compactions (Major/Minor) merge deltas/bases but delete of old files
      * happens in a different process; thus it's possible to have bases/deltas 
with
@@ -418,15 +648,49 @@ public class AcidUtils {
     return results.toArray(new Path[results.size()]);
   }
 
-  private static ParsedDelta parseDelta(FileStatus path) {
-    ParsedDelta p = parsedDelta(path.getPath());
-    return new ParsedDelta(p.getMinTransaction(),
-      p.getMaxTransaction(), path, p.statementId);
+  /**
+   * Convert the list of begin/end transaction id pairs to a list of delete 
delta
+   * directories.  Note that there may be multiple delete_delta files for the 
exact same txn range starting
+   * with 2.2.x;
+   * see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, 
int)}
+   * @param root the root directory
+   * @param deleteDeltas list of begin/end transaction id pairs
+   * @return the list of delta paths
+   */
+  public static Path[] deserializeDeleteDeltas(Path root, final 
List<AcidInputFormat.DeltaMetaData> deleteDeltas) throws IOException {
+    List<Path> results = new ArrayList<Path>(deleteDeltas.size());
+    for(AcidInputFormat.DeltaMetaData dmd : deleteDeltas) {
+      if(dmd.getStmtIds().isEmpty()) {
+        results.add(new Path(root, deleteDeltaSubdir(dmd.getMinTxnId(), 
dmd.getMaxTxnId())));
+        continue;
+      }
+      for(Integer stmtId : dmd.getStmtIds()) {
+        results.add(new Path(root, deleteDeltaSubdir(dmd.getMinTxnId(), 
dmd.getMaxTxnId(), stmtId)));
+      }
+    }
+    return results.toArray(new Path[results.size()]);
   }
+
   public static ParsedDelta parsedDelta(Path deltaDir) {
+    String deltaDirName = deltaDir.getName();
+    if (deltaDirName.startsWith(DELETE_DELTA_PREFIX)) {
+      return parsedDelta(deltaDir, DELETE_DELTA_PREFIX);
+    }
+    return parsedDelta(deltaDir, DELTA_PREFIX); // default prefix is 
delta_prefix
+  }
+
+  private static ParsedDelta parseDelta(FileStatus path, String deltaPrefix) {
+    ParsedDelta p = parsedDelta(path.getPath(), deltaPrefix);
+    boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX);
+    return new ParsedDelta(p.getMinTransaction(),
+        p.getMaxTransaction(), path, p.statementId, isDeleteDelta);
+  }
+
+  public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix) {
     String filename = deltaDir.getName();
-    if (filename.startsWith(DELTA_PREFIX)) {
-      String rest = filename.substring(DELTA_PREFIX.length());
+    boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX);
+    if (filename.startsWith(deltaPrefix)) {
+      String rest = filename.substring(deltaPrefix.length());
       int split = rest.indexOf('_');
       int split2 = rest.indexOf('_', split + 1);//may be -1 if no statementId
       long min = Long.parseLong(rest.substring(0, split));
@@ -434,13 +698,13 @@ public class AcidUtils {
         Long.parseLong(rest.substring(split + 1)) :
         Long.parseLong(rest.substring(split + 1, split2));
       if(split2 == -1) {
-        return new ParsedDelta(min, max, null);
+        return new ParsedDelta(min, max, null, isDeleteDelta);
       }
       int statementId = Integer.parseInt(rest.substring(split2 + 1));
-      return new ParsedDelta(min, max, null, statementId);
+      return new ParsedDelta(min, max, null, statementId, isDeleteDelta);
     }
     throw new IllegalArgumentException(deltaDir + " does not start with " +
-                                       DELTA_PREFIX);
+                                       deltaPrefix);
   }
 
   /**
@@ -456,7 +720,8 @@ public class AcidUtils {
     for(FileStatus file: fs.listStatus(directory)) {
       String filename = file.getPath().getName();
       if (filename.startsWith(BASE_PREFIX) ||
-          filename.startsWith(DELTA_PREFIX)) {
+          filename.startsWith(DELTA_PREFIX) ||
+          filename.startsWith(DELETE_DELTA_PREFIX)) {
         if (file.isDir()) {
           return true;
         }
@@ -499,6 +764,7 @@ public class AcidUtils {
                                        boolean ignoreEmptyFiles
                                        ) throws IOException {
     FileSystem fs = directory.getFileSystem(conf);
+    // The following 'deltas' includes all kinds of delta files including 
insert & delete deltas.
     final List<ParsedDelta> deltas = new ArrayList<ParsedDelta>();
     List<ParsedDelta> working = new ArrayList<ParsedDelta>();
     List<FileStatus> originalDirectories = new ArrayList<FileStatus>();
@@ -553,6 +819,7 @@ public class AcidUtils {
     //subject to list of 'exceptions' in 'txnList' (not show in above example).
     long current = bestBase.txn;
     int lastStmtId = -1;
+    ParsedDelta prev = null;
     for(ParsedDelta next: working) {
       if (next.maxTransaction > current) {
         // are any of the new transactions ones that we care about?
@@ -561,6 +828,7 @@ public class AcidUtils {
           deltas.add(next);
           current = next.maxTransaction;
           lastStmtId = next.statementId;
+          prev = next;
         }
       }
       else if(next.maxTransaction == current && lastStmtId >= 0) {
@@ -568,6 +836,24 @@ public class AcidUtils {
         //generate multiple delta files with the same txnId range
         //of course, if maxTransaction has already been minor compacted, all 
per statement deltas are obsolete
         deltas.add(next);
+        prev = next;
+      }
+      else if (prev != null && next.maxTransaction == prev.maxTransaction
+                  && next.minTransaction == prev.minTransaction
+                  && next.statementId == prev.statementId) {
+        // The 'next' parsedDelta may have everything equal to the 'prev' 
parsedDelta, except
+        // the path. This may happen when we have split update and we have two 
types of delta
+        // directories- 'delta_x_y' and 'delete_delta_x_y' for the SAME txn 
range.
+
+        // Also note that any delete_deltas in between a given delta_x_y range 
would be made
+        // obsolete. For example, a delta_30_50 would make delete_delta_40_40 
obsolete.
+        // This is valid because minor compaction always compacts the normal 
deltas and the delete
+        // deltas for the same range. That is, if we had 3 directories, 
delta_30_30,
+        // delete_delta_40_40 and delta_50_50, then running minor compaction 
would produce
+        // delta_30_50 and delete_delta_30_50.
+
+        deltas.add(next);
+        prev = next;
       }
       else {
         obsolete.add(next.path);
@@ -638,7 +924,8 @@ public class AcidUtils {
   }
   private static void getChildState(FileStatus child, HdfsFileStatusWithId 
childWithId,
       ValidTxnList txnList, List<ParsedDelta> working, List<FileStatus> 
originalDirectories,
-      List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase 
bestBase, boolean ignoreEmptyFiles) {
+      List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase 
bestBase,
+      boolean ignoreEmptyFiles) throws IOException {
     Path p = child.getPath();
     String fn = p.getName();
     if (fn.startsWith(BASE_PREFIX) && child.isDir()) {
@@ -662,8 +949,11 @@ public class AcidUtils {
       } else {
         obsolete.add(child);
       }
-    } else if (fn.startsWith(DELTA_PREFIX) && child.isDir()) {
-      ParsedDelta delta = parseDelta(child);
+    } else if ((fn.startsWith(DELTA_PREFIX) || 
fn.startsWith(DELETE_DELTA_PREFIX))
+                    && child.isDir()) {
+      String deltaPrefix =
+              (fn.startsWith(DELTA_PREFIX)) ? DELTA_PREFIX : 
DELETE_DELTA_PREFIX;
+      ParsedDelta delta = parseDelta(child, deltaPrefix);
       if (txnList.isTxnRangeValid(delta.minTransaction,
           delta.maxTransaction) !=
           ValidTxnList.RangeResponse.NONE) {
@@ -791,4 +1081,84 @@ public class AcidUtils {
 
     return tableIsTransactional != null && 
tableIsTransactional.equalsIgnoreCase("true");
   }
+
+  /**
+   * Sets the acidOperationalProperties in the configuration object argument.
+   * @param conf Mutable configuration object
+   * @param properties An acidOperationalProperties object to initialize from.
+   */
+  public static void setAcidOperationalProperties(Configuration conf,
+          AcidOperationalProperties properties) {
+    if (properties != null) {
+      HiveConf.setIntVar(conf, ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES, 
properties.toInt());
+    }
+  }
+
+  /**
+   * Sets the acidOperationalProperties in the map object argument.
+   * @param parameters Mutable map object
+   * @param properties An acidOperationalProperties object to initialize from.
+   */
+  public static void setAcidOperationalProperties(
+          Map<String, String> parameters, AcidOperationalProperties 
properties) {
+    if (properties != null) {
+      parameters.put(ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, 
properties.toString());
+    }
+  }
+
+  /**
+   * Returns the acidOperationalProperties for a given table.
+   * @param table A table object
+   * @return the acidOperationalProperties object for the corresponding table.
+   */
+  public static AcidOperationalProperties getAcidOperationalProperties(Table 
table) {
+    String transactionalProperties = table.getProperty(
+            hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
+    if (transactionalProperties == null) {
+      // If the table does not define any transactional properties, we return 
a legacy type.
+      return AcidOperationalProperties.getLegacy();
+    }
+    return AcidOperationalProperties.parseString(transactionalProperties);
+  }
+
+  /**
+   * Returns the acidOperationalProperties for a given configuration.
+   * @param conf A configuration object
+   * @return the acidOperationalProperties object for the corresponding 
configuration.
+   */
+  public static AcidOperationalProperties 
getAcidOperationalProperties(Configuration conf) {
+    // If the conf does not define any transactional properties, the 
parseInt() should receive
+    // a value of zero, which will set AcidOperationalProperties to a legacy 
type and return that.
+    return AcidOperationalProperties.parseInt(
+            HiveConf.getIntVar(conf, 
ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES));
+  }
+
+  /**
+   * Returns the acidOperationalProperties for a given set of properties.
+   * @param props A properties object
+   * @return the acidOperationalProperties object for the corresponding 
properties.
+   */
+  public static AcidOperationalProperties 
getAcidOperationalProperties(Properties props) {
+    String resultStr = 
props.getProperty(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
+    if (resultStr == null) {
+      // If the properties does not define any transactional properties, we 
return a legacy type.
+      return AcidOperationalProperties.getLegacy();
+    }
+    return AcidOperationalProperties.parseString(resultStr);
+  }
+
+  /**
+   * Returns the acidOperationalProperties for a given map.
+   * @param parameters A parameters object
+   * @return the acidOperationalProperties object for the corresponding map.
+   */
+  public static AcidOperationalProperties getAcidOperationalProperties(
+          Map<String, String> parameters) {
+    String resultStr = 
parameters.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
+    if (resultStr == null) {
+      // If the parameters does not define any transactional properties, we 
return a legacy type.
+      return AcidOperationalProperties.getLegacy();
+    }
+    return AcidOperationalProperties.parseString(resultStr);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 945b828..c4b9940 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -637,6 +637,7 @@ public class HiveInputFormat<K extends WritableComparable, 
V extends Writable>
         pushFilters(jobConf, ts);
 
         AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable());
+        AcidUtils.setAcidOperationalProperties(job, 
ts.getConf().getAcidOperationalProperties());
       }
     }
   }

Reply via email to