Repository: hive
Updated Branches:
  refs/heads/master 333fa8763 -> ecab0d072


http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
new file mode 100644
index 0000000..becb22a
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Same as TestTxnCommands2 but tests ACID tables with 
'transactional_properties' set to 'default'.
+ * This tests whether ACID tables with split-update turned on are working 
correctly or not
+ * for the same set of tests when it is turned off. Of course, it also adds a 
few tests to test
+ * specific behaviors of ACID tables with split-update turned on.
+ */
+public class TestTxnCommands2WithSplitUpdate extends TestTxnCommands2 {
+
+  public TestTxnCommands2WithSplitUpdate() {
+    super();
+  }
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    
setUpWithTableProperties("'transactional'='true','transactional_properties'='default'");
+  }
+
+  @Override
+  @Test
+  public void testOrcPPD() throws Exception  {
+    final String defaultUnset = "unset";
+    String oldSplitStrategyValue = 
hiveConf.get(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, defaultUnset);
+    // TODO: Setting split strategy as 'BI' is workaround for HIVE-14448 until 
it is resolved.
+    hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
+
+    super.testOrcPPD();
+
+    // Restore the previous value for split strategy, or unset if not 
previously set.
+    if (oldSplitStrategyValue.equals(defaultUnset)) {
+      hiveConf.unset(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname);
+    } else {
+      hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, 
oldSplitStrategyValue);
+    }
+  }
+
+  @Override
+  @Test
+  public void testOrcNoPPD() throws Exception {
+    final String defaultUnset = "unset";
+    String oldSplitStrategyValue = 
hiveConf.get(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, defaultUnset);
+    // TODO: Setting split strategy as 'BI' is workaround for HIVE-14448 until 
it is resolved.
+    hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
+
+    super.testOrcNoPPD();
+
+    // Restore the previous value for split strategy, or unset if not 
previously set.
+    if (oldSplitStrategyValue.equals(defaultUnset)) {
+      hiveConf.unset(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname);
+    } else {
+      hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, 
oldSplitStrategyValue);
+    }
+  }
+
+  @Override
+  @Test
+  public void testInitiatorWithMultipleFailedCompactions() throws Exception {
+    // Test with split-update turned on.
+    
testInitiatorWithMultipleFailedCompactionsForVariousTblProperties("'transactional'='true','transactional_properties'='default'");
+  }
+
+  @Override
+  @Test
+  public void writeBetweenWorkerAndCleaner() throws Exception {
+    
writeBetweenWorkerAndCleanerForVariousTblProperties("'transactional'='true','transactional_properties'='default'");
+  }
+
+  @Override
+  @Test
+  public void testACIDwithSchemaEvolutionAndCompaction() throws Exception {
+    
testACIDwithSchemaEvolutionForVariousTblProperties("'transactional'='true','transactional_properties'='default'");
+  }
+
+  /**
+   * In current implementation of ACID, altering the value of 
transactional_properties or trying to
+   * set a value for previously unset value for an acid table will throw an 
exception.
+   * @throws Exception
+   */
+  @Test
+  public void testFailureOnAlteringTransactionalProperties() throws Exception {
+    expectedException.expect(RuntimeException.class);
+    expectedException.expectMessage("TBLPROPERTIES with 
'transactional_properties' cannot be altered after the table is created");
+    runStatementOnDriver("create table acidTblLegacy (a int, b int) clustered 
by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES 
('transactional'='true')");
+    runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES 
('transactional_properties' = 'default')");
+  }
+
+  /**
+   * Test the query correctness and directory layout for ACID table conversion 
with split-update
+   * enabled.
+   * 1. Insert a row to Non-ACID table
+   * 2. Convert Non-ACID to ACID table with split-update enabled
+   * 3. Insert a row to ACID table
+   * 4. Perform Major compaction
+   * 5. Clean
+   * @throws Exception
+   */
+  @Test
+  public void testNonAcidToAcidSplitUpdateConversion1() throws Exception {
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] status;
+
+    // 1. Insert a row to Non-ACID table
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) 
values(1,2)");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    // There should be 2 original bucket files in the location (000000_0 and 
000001_0)
+    Assert.assertEquals(BUCKET_COUNT, status.length);
+    for (int i = 0; i < status.length; i++) {
+      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+    }
+    List<String> rs = runStatementOnDriver("select a,b from " + 
Table.NONACIDORCTBL);
+    int [][] resultData = new int[][] {{1, 2}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    int resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 2. Convert NONACIDORCTBL to ACID table
+    runStatementOnDriver("alter table " + Table.NONACIDORCTBL
+        + " SET TBLPROPERTIES ('transactional'='true', 
'transactional_properties'='default')");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    // Everything should be same as before
+    Assert.assertEquals(BUCKET_COUNT, status.length);
+    for (int i = 0; i < status.length; i++) {
+      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+    }
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 2}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 3. Insert another row to newly-converted ACID table
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) 
values(3,4)");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    // There should be 2 original bucket files (000000_0 and 000001_0), plus a 
new delta directory.
+    // The delta directory should also have only 1 bucket file (bucket_00001)
+    Assert.assertEquals(3, status.length);
+    boolean sawNewDelta = false;
+    for (int i = 0; i < status.length; i++) {
+      if (status[i].getPath().getName().matches("delta_.*")) {
+        sawNewDelta = true;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), 
FileUtils.STAGING_DIR_PATH_FILTER);
+        Assert.assertEquals(1, buckets.length); // only one bucket file
+        
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
+      } else {
+        
Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+      }
+    }
+    Assert.assertTrue(sawNewDelta);
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL + " 
order by a,b");
+    resultData = new int[][] {{1, 2}, {3, 4}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 2;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 4. Perform a major compaction
+    runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 
'MAJOR'");
+    runWorker(hiveConf);
+    // There should be 1 new directory: base_xxxxxxx.
+    // Original bucket files and delta directory should stay until Cleaner 
kicks in.
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(4, status.length);
+    boolean sawNewBase = false;
+    for (int i = 0; i < status.length; i++) {
+      if (status[i].getPath().getName().matches("base_.*")) {
+        sawNewBase = true;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), 
FileUtils.STAGING_DIR_PATH_FILTER);
+        Assert.assertEquals(1, buckets.length);
+        
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
+      }
+    }
+    Assert.assertTrue(sawNewBase);
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 2}, {3, 4}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 2;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 5. Let Cleaner delete obsolete files/dirs
+    // Note, here we create a fake directory along with fake files as original 
directories/files
+    String fakeFile0 = TEST_WAREHOUSE_DIR + "/" + 
(Table.NONACIDORCTBL).toString().toLowerCase() +
+        "/subdir/000000_0";
+    String fakeFile1 = TEST_WAREHOUSE_DIR + "/" + 
(Table.NONACIDORCTBL).toString().toLowerCase() +
+        "/subdir/000000_1";
+    fs.create(new Path(fakeFile0));
+    fs.create(new Path(fakeFile1));
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    // Before Cleaner, there should be 5 items:
+    // 2 original files, 1 original directory, 1 base directory and 1 delta 
directory
+    Assert.assertEquals(5, status.length);
+    runCleaner(hiveConf);
+    // There should be only 1 directory left: base_xxxxxxx.
+    // Original bucket files and delta directory should have been cleaned up.
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(1, status.length);
+    Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
+    FileStatus[] buckets = fs.listStatus(status[0].getPath(), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(1, buckets.length);
+    Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 2}, {3, 4}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 2;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+  }
+
+  /**
+   * Test the query correctness and directory layout for ACID table conversion 
with split-update
+   * enabled.
+   * 1. Insert a row to Non-ACID table
+   * 2. Convert Non-ACID to ACID table with split update enabled.
+   * 3. Update the existing row in ACID table
+   * 4. Perform Major compaction
+   * 5. Clean
+   * @throws Exception
+   */
+  @Test
+  public void testNonAcidToAcidSplitUpdateConversion2() throws Exception {
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] status;
+
+    // 1. Insert a row to Non-ACID table
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) 
values(1,2)");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    // There should be 2 original bucket files in the location (000000_0 and 
000001_0)
+    Assert.assertEquals(BUCKET_COUNT, status.length);
+    for (int i = 0; i < status.length; i++) {
+      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+    }
+    List<String> rs = runStatementOnDriver("select a,b from " + 
Table.NONACIDORCTBL);
+    int [][] resultData = new int[][] {{1, 2}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    int resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 2. Convert NONACIDORCTBL to ACID table
+    runStatementOnDriver("alter table " + Table.NONACIDORCTBL
+        + " SET TBLPROPERTIES ('transactional'='true', 
'transactional_properties'='default')");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    // Everything should be same as before
+    Assert.assertEquals(BUCKET_COUNT, status.length);
+    for (int i = 0; i < status.length; i++) {
+      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+    }
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 2}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 3. Update the existing row in newly-converted ACID table
+    runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where 
a=1");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    // There should be 2 original bucket files (000000_0 and 000001_0), plus 
one delta directory
+    // and one delete_delta directory. When split-update is enabled, an update 
event is split into
+    // a combination of delete and insert, that generates the delete_delta 
directory.
+    // The delta directory should also have 2 bucket files (bucket_00000 and 
bucket_00001)
+    // and so should the delete_delta directory.
+    Assert.assertEquals(4, status.length);
+    boolean sawNewDelta = false;
+    boolean sawNewDeleteDelta = false;
+    for (int i = 0; i < status.length; i++) {
+      if (status[i].getPath().getName().matches("delta_.*")) {
+        sawNewDelta = true;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), 
FileUtils.STAGING_DIR_PATH_FILTER);
+        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+        
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+      } else if (status[i].getPath().getName().matches("delete_delta_.*")) {
+        sawNewDeleteDelta = true;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), 
FileUtils.STAGING_DIR_PATH_FILTER);
+        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+        
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+      } else {
+        
Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+      }
+    }
+    Assert.assertTrue(sawNewDelta);
+    Assert.assertTrue(sawNewDeleteDelta);
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 3}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 4. Perform a major compaction
+    runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 
'MAJOR'");
+    runWorker(hiveConf);
+    // There should be 1 new directory: base_0000001.
+    // Original bucket files and delta directory should stay until Cleaner 
kicks in.
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(5, status.length);
+    boolean sawNewBase = false;
+    for (int i = 0; i < status.length; i++) {
+      if (status[i].getPath().getName().matches("base_.*")) {
+        sawNewBase = true;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), 
FileUtils.STAGING_DIR_PATH_FILTER);
+        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+        
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
+      }
+    }
+    Assert.assertTrue(sawNewBase);
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 3}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 5. Let Cleaner delete obsolete files/dirs
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    // Before Cleaner, there should be 5 items:
+    // 2 original files, 1 delta directory, 1 delete_delta directory and 1 
base directory
+    Assert.assertEquals(5, status.length);
+    runCleaner(hiveConf);
+    // There should be only 1 directory left: base_0000001.
+    // Original bucket files, delta directory and delete_delta should have 
been cleaned up.
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(1, status.length);
+    Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
+    FileStatus[] buckets = fs.listStatus(status[0].getPath(), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+    Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 3}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+  }
+
+  /**
+   * Test the query correctness and directory layout for ACID table conversion 
with split-update
+   * enabled.
+   * 1. Insert a row to Non-ACID table
+   * 2. Convert Non-ACID to ACID table with split-update enabled
+   * 3. Perform Major compaction
+   * 4. Insert a new row to ACID table
+   * 5. Perform another Major compaction
+   * 6. Clean
+   * @throws Exception
+   */
+  @Test
+  public void testNonAcidToAcidSplitUpdateConversion3() throws Exception {
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] status;
+
+    // 1. Insert a row to Non-ACID table
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) 
values(1,2)");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    // There should be 2 original bucket files in the location (000000_0 and 
000001_0)
+    Assert.assertEquals(BUCKET_COUNT, status.length);
+    for (int i = 0; i < status.length; i++) {
+      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+    }
+    List<String> rs = runStatementOnDriver("select a,b from " + 
Table.NONACIDORCTBL);
+    int [][] resultData = new int[][] {{1, 2}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    int resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 2. Convert NONACIDORCTBL to ACID table with split_update enabled. 
(txn_props=default)
+    runStatementOnDriver("alter table " + Table.NONACIDORCTBL
+        + " SET TBLPROPERTIES ('transactional'='true', 
'transactional_properties'='default')");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    // Everything should be same as before
+    Assert.assertEquals(BUCKET_COUNT, status.length);
+    for (int i = 0; i < status.length; i++) {
+      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+    }
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 2}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 3. Perform a major compaction
+    runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 
'MAJOR'");
+    runWorker(hiveConf);
+    // There should be 1 new directory: base_-9223372036854775808
+    // Original bucket files should stay until Cleaner kicks in.
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(3, status.length);
+    boolean sawNewBase = false;
+    for (int i = 0; i < status.length; i++) {
+      if (status[i].getPath().getName().matches("base_.*")) {
+        Assert.assertEquals("base_-9223372036854775808", 
status[i].getPath().getName());
+        sawNewBase = true;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), 
FileUtils.STAGING_DIR_PATH_FILTER);
+        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+        Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+      }
+    }
+    Assert.assertTrue(sawNewBase);
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 2}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 4. Update the existing row, and insert another row to newly-converted 
ACID table
+    runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where 
a=1");
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) 
values(3,4)");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    Arrays.sort(status);  // make sure delta_0000001_0000001_0000 appears 
before delta_0000002_0000002_0000
+    // There should be 2 original bucket files (000000_0 and 000001_0), a base 
directory,
+    // plus two new delta directories and one delete_delta directory that 
would be created due to
+    // the update statement (remember split-update U=D+I)!
+    Assert.assertEquals(6, status.length);
+    int numDelta = 0;
+    int numDeleteDelta = 0;
+    sawNewBase = false;
+    for (int i = 0; i < status.length; i++) {
+      if (status[i].getPath().getName().matches("delta_.*")) {
+        numDelta++;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), 
FileUtils.STAGING_DIR_PATH_FILTER);
+        Arrays.sort(buckets);
+        if (numDelta == 1) {
+          Assert.assertEquals("delta_0000001_0000001_0000", 
status[i].getPath().getName());
+          Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+        } else if (numDelta == 2) {
+          Assert.assertEquals("delta_0000002_0000002_0000", 
status[i].getPath().getName());
+          Assert.assertEquals(1, buckets.length);
+          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+        }
+      } else if (status[i].getPath().getName().matches("delete_delta_.*")) {
+        numDeleteDelta++;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), 
FileUtils.STAGING_DIR_PATH_FILTER);
+        Arrays.sort(buckets);
+        if (numDeleteDelta == 1) {
+          Assert.assertEquals("delete_delta_0000001_0000001_0000", 
status[i].getPath().getName());
+          Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+        }
+      } else if (status[i].getPath().getName().matches("base_.*")) {
+        Assert.assertEquals("base_-9223372036854775808", 
status[i].getPath().getName());
+        sawNewBase = true;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), 
FileUtils.STAGING_DIR_PATH_FILTER);
+        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+        Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+      } else {
+        
Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+      }
+    }
+    Assert.assertEquals(2, numDelta);
+    Assert.assertEquals(1, numDeleteDelta);
+    Assert.assertTrue(sawNewBase);
+
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 3}, {3, 4}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 2;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 5. Perform another major compaction
+    runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 
'MAJOR'");
+    runWorker(hiveConf);
+    // There should be 1 new base directory: base_0000001
+    // Original bucket files, delta directories, delete_delta directories and 
the
+    // previous base directory should stay until Cleaner kicks in.
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    Arrays.sort(status);
+    Assert.assertEquals(7, status.length);
+    int numBase = 0;
+    for (int i = 0; i < status.length; i++) {
+      if (status[i].getPath().getName().matches("base_.*")) {
+        numBase++;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), 
FileUtils.STAGING_DIR_PATH_FILTER);
+        Arrays.sort(buckets);
+        if (numBase == 1) {
+          Assert.assertEquals("base_-9223372036854775808", 
status[i].getPath().getName());
+          Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+        } else if (numBase == 2) {
+          // The new base dir now has two bucket files, since the delta dir 
has two bucket files
+          Assert.assertEquals("base_0000002", status[i].getPath().getName());
+          Assert.assertEquals(1, buckets.length);
+          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+        }
+      }
+    }
+    Assert.assertEquals(2, numBase);
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 3}, {3, 4}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 2;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 6. Let Cleaner delete obsolete files/dirs
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    // Before Cleaner, there should be 6 items:
+    // 2 original files, 2 delta directories, 1 delete_delta directory and 2 
base directories
+    Assert.assertEquals(7, status.length);
+    runCleaner(hiveConf);
+    // There should be only 1 directory left: base_0000001.
+    // Original bucket files, delta directories and previous base directory 
should have been cleaned up.
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(1, status.length);
+    Assert.assertEquals("base_0000002", status[0].getPath().getName());
+    FileStatus[] buckets = fs.listStatus(status[0].getPath(), 
FileUtils.STAGING_DIR_PATH_FILTER);
+    Arrays.sort(buckets);
+    Assert.assertEquals(1, buckets.length);
+    Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 3}, {3, 4}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 2;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java 
b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index 556df18..a7ff9a3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -17,11 +17,21 @@
  */
 package org.apache.hadoop.hive.ql.io;
 
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidCompactorTxnList;
 import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.io.AcidUtils.AcidOperationalProperties;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFileSystem;
@@ -30,13 +40,6 @@ import 
org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 public class TestAcidUtils {
 
   @Test
@@ -60,12 +63,23 @@ public class TestAcidUtils {
     options.writingBase(false);
     assertEquals("/tmp/delta_0000100_0000200_0000/bucket_00023",
       AcidUtils.createFilename(p, options).toString());
+    options.writingDeleteDelta(true);
+    assertEquals("/tmp/delete_delta_0000100_0000200_0000/bucket_00023",
+        AcidUtils.createFilename(p, options).toString());
+    options.writingDeleteDelta(false);
     options.statementId(-1);
     assertEquals("/tmp/delta_0000100_0000200/bucket_00023",
       AcidUtils.createFilename(p, options).toString());
+    options.writingDeleteDelta(true);
+    assertEquals("/tmp/delete_delta_0000100_0000200/bucket_00023",
+        AcidUtils.createFilename(p, options).toString());
+    options.writingDeleteDelta(false);
     options.statementId(7);
     assertEquals("/tmp/delta_0000100_0000200_0007/bucket_00023",
       AcidUtils.createFilename(p, options).toString());
+    options.writingDeleteDelta(true);
+    assertEquals("/tmp/delete_delta_0000100_0000200_0007/bucket_00023",
+        AcidUtils.createFilename(p, options).toString());
   }
   @Test
   public void testCreateFilenameLargeIds() throws Exception {
@@ -86,7 +100,6 @@ public class TestAcidUtils {
     assertEquals("/tmp/delta_1234567880_1234567890_0000/bucket_00023",
       AcidUtils.createFilename(p, options).toString());
   }
-  
 
   @Test
   public void testParsing() throws Exception {
@@ -94,19 +107,34 @@ public class TestAcidUtils {
     Path dir = new Path("/tmp/tbl");
     Configuration conf = new Configuration();
     AcidOutputFormat.Options opts =
-        AcidUtils.parseBaseBucketFilename(new Path(dir, "base_567/bucket_123"),
+        AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, 
"base_567/bucket_123"),
             conf);
     assertEquals(false, opts.getOldStyle());
     assertEquals(true, opts.isWritingBase());
     assertEquals(567, opts.getMaximumTransactionId());
     assertEquals(0, opts.getMinimumTransactionId());
     assertEquals(123, opts.getBucket());
-    opts = AcidUtils.parseBaseBucketFilename(new Path(dir, "000123_0"), conf);
+    opts = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, 
"delta_000005_000006/bucket_00001"),
+        conf);
+    assertEquals(false, opts.getOldStyle());
+    assertEquals(false, opts.isWritingBase());
+    assertEquals(6, opts.getMaximumTransactionId());
+    assertEquals(5, opts.getMinimumTransactionId());
+    assertEquals(1, opts.getBucket());
+    opts = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, 
"delete_delta_000005_000006/bucket_00001"),
+        conf);
+    assertEquals(false, opts.getOldStyle());
+    assertEquals(false, opts.isWritingBase());
+    assertEquals(6, opts.getMaximumTransactionId());
+    assertEquals(5, opts.getMinimumTransactionId());
+    assertEquals(1, opts.getBucket());
+    opts = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "000123_0"), 
conf);
     assertEquals(true, opts.getOldStyle());
     assertEquals(true, opts.isWritingBase());
     assertEquals(123, opts.getBucket());
     assertEquals(0, opts.getMinimumTransactionId());
     assertEquals(0, opts.getMaximumTransactionId());
+
   }
 
   @Test
@@ -471,5 +499,230 @@ public class TestAcidUtils {
     assertEquals("mock:/tbl/part1/delta_1_1", 
delts.get(0).getPath().toString());
   }
 
+  @Test
+  public void testBaseWithDeleteDeltas() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+        AcidOperationalProperties.getDefault().toInt());
+    MockFileSystem fs = new MockFileSystem(conf,
+        new MockFile("mock:/tbl/part1/base_5/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/base_10/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/base_49/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delta_025_025/bucket_0", 0, new byte[0]),
+        new MockFile("mock:/tbl/part1/delta_029_029/bucket_0", 0, new byte[0]),
+        new MockFile("mock:/tbl/part1/delete_delta_029_029/bucket_0", 0, new 
byte[0]),
+        new MockFile("mock:/tbl/part1/delta_025_030/bucket_0", 0, new byte[0]),
+        new MockFile("mock:/tbl/part1/delete_delta_025_030/bucket_0", 0, new 
byte[0]),
+        new MockFile("mock:/tbl/part1/delta_050_105/bucket_0", 0, new byte[0]),
+        new MockFile("mock:/tbl/part1/delete_delta_050_105/bucket_0", 0, new 
byte[0]),
+        new MockFile("mock:/tbl/part1/delete_delta_110_110/bucket_0", 0, new 
byte[0]));
+    AcidUtils.Directory dir =
+        AcidUtils.getAcidState(new TestInputOutputFormat.MockPath(fs,
+            "mock:/tbl/part1"), conf, new ValidReadTxnList("100:" + 
Long.MAX_VALUE + ":"));
+    assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString());
+    List<FileStatus> obsolete = dir.getObsolete();
+    assertEquals(7, obsolete.size());
+    assertEquals("mock:/tbl/part1/base_10", 
obsolete.get(0).getPath().toString());
+    assertEquals("mock:/tbl/part1/base_5", 
obsolete.get(1).getPath().toString());
+    assertEquals("mock:/tbl/part1/delete_delta_025_030", 
obsolete.get(2).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_025_030", 
obsolete.get(3).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_025_025", 
obsolete.get(4).getPath().toString());
+    assertEquals("mock:/tbl/part1/delete_delta_029_029", 
obsolete.get(5).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_029_029", 
obsolete.get(6).getPath().toString());
+    assertEquals(0, dir.getOriginalFiles().size());
+    List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
+    assertEquals(2, deltas.size());
+    assertEquals("mock:/tbl/part1/delete_delta_050_105", 
deltas.get(0).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_050_105", 
deltas.get(1).getPath().toString());
+    // The delete_delta_110_110 should not be read because it is greater than 
the high watermark.
+  }
+
+  @Test
+  public void testOverlapingDeltaAndDeleteDelta() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+        AcidOperationalProperties.getDefault().toInt());
+    MockFileSystem fs = new MockFileSystem(conf,
+        new MockFile("mock:/tbl/part1/delta_0000063_63/bucket_0", 500, new 
byte[0]),
+        new MockFile("mock:/tbl/part1/delta_000062_62/bucket_0", 500, new 
byte[0]),
+        new MockFile("mock:/tbl/part1/delta_00061_61/bucket_0", 500, new 
byte[0]),
+        new MockFile("mock:/tbl/part1/delete_delta_00064_64/bucket_0", 500, 
new byte[0]),
+        new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delete_delta_40_60/bucket_0", 500, new 
byte[0]),
+        new MockFile("mock:/tbl/part1/delta_0060_60/bucket_0", 500, new 
byte[0]),
+        new MockFile("mock:/tbl/part1/delta_052_55/bucket_0", 500, new 
byte[0]),
+        new MockFile("mock:/tbl/part1/delete_delta_052_55/bucket_0", 500, new 
byte[0]),
+        new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0]));
+    Path part = new MockPath(fs, "mock:/tbl/part1");
+    AcidUtils.Directory dir =
+        AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:" + 
Long.MAX_VALUE + ":"));
+    assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
+    List<FileStatus> obsolete = dir.getObsolete();
+    assertEquals(3, obsolete.size());
+    assertEquals("mock:/tbl/part1/delete_delta_052_55", 
obsolete.get(0).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_052_55", 
obsolete.get(1).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_0060_60", 
obsolete.get(2).getPath().toString());
+    List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
+    assertEquals(6, delts.size());
+    assertEquals("mock:/tbl/part1/delete_delta_40_60", 
delts.get(0).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_40_60", 
delts.get(1).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_00061_61", 
delts.get(2).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_000062_62", 
delts.get(3).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_0000063_63", 
delts.get(4).getPath().toString());
+    assertEquals("mock:/tbl/part1/delete_delta_00064_64", 
delts.get(5).getPath().toString());
+  }
 
-}
+  @Test
+  public void testMinorCompactedDeltaMakesInBetweenDelteDeltaObsolete() throws 
Exception {
+    // This test checks that if we have a minor compacted delta for the txn 
range [40,60]
+    // then it will make any delete delta in that range as obsolete.
+    Configuration conf = new Configuration();
+    conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+        AcidUtils.AcidOperationalProperties.getDefault().toInt());
+    MockFileSystem fs = new MockFileSystem(conf,
+        new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delete_delta_50_50/bucket_0", 500, new 
byte[0]));
+    Path part = new MockPath(fs, "mock:/tbl/part1");
+    AcidUtils.Directory dir =
+        AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:" + 
Long.MAX_VALUE + ":"));
+    List<FileStatus> obsolete = dir.getObsolete();
+    assertEquals(1, obsolete.size());
+    assertEquals("mock:/tbl/part1/delete_delta_50_50", 
obsolete.get(0).getPath().toString());
+    List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
+    assertEquals(1, delts.size());
+    assertEquals("mock:/tbl/part1/delta_40_60", 
delts.get(0).getPath().toString());
+  }
+
+  @Test
+  public void deltasAndDeleteDeltasWithOpenTxnsNotInCompact() throws Exception 
{
+    // This tests checks that appropriate delta and delete_deltas are included 
when minor
+    // compactions specifies a valid open txn range.
+    Configuration conf = new Configuration();
+    conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+        AcidUtils.AcidOperationalProperties.getDefault().toInt());
+    MockFileSystem fs = new MockFileSystem(conf,
+        new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delete_delta_2_2/bucket_0", 500, new 
byte[0]),
+        new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delete_delta_2_5/bucket_0", 500, new 
byte[0]),
+        new MockFile("mock:/tbl/part1/delta_2_5/bucket_0" + 
AcidUtils.DELTA_SIDE_FILE_SUFFIX, 500,
+            new byte[0]),
+        new MockFile("mock:/tbl/part1/delete_delta_7_7/bucket_0", 500, new 
byte[0]),
+        new MockFile("mock:/tbl/part1/delta_6_10/bucket_0", 500, new byte[0]));
+    Path part = new MockPath(fs, "mock:/tbl/part1");
+    AcidUtils.Directory dir =
+        AcidUtils.getAcidState(part, conf, new ValidCompactorTxnList("4:" + 
Long.MAX_VALUE + ":"));
+    List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
+    assertEquals(2, delts.size());
+    assertEquals("mock:/tbl/part1/delta_1_1", 
delts.get(0).getPath().toString());
+    assertEquals("mock:/tbl/part1/delete_delta_2_2", 
delts.get(1).getPath().toString());
+  }
+
+  @Test
+  public void deleteDeltasWithOpenTxnInRead() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+        AcidUtils.AcidOperationalProperties.getDefault().toInt());
+    MockFileSystem fs = new MockFileSystem(conf,
+      new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delete_delta_2_5/bucket_0", 500, new 
byte[0]),
+      new MockFile("mock:/tbl/part1/delete_delta_3_3/bucket_0", 500, new 
byte[0]),
+      new MockFile("mock:/tbl/part1/delta_4_4_1/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delta_4_4_3/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delta_101_101_1/bucket_0", 500, new 
byte[0]));
+    Path part = new MockPath(fs, "mock:/tbl/part1");
+    AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new 
ValidReadTxnList("100:4:4"));
+    List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
+    assertEquals(3, delts.size());
+    assertEquals("mock:/tbl/part1/delta_1_1", 
delts.get(0).getPath().toString());
+    assertEquals("mock:/tbl/part1/delete_delta_2_5", 
delts.get(1).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_2_5", 
delts.get(2).getPath().toString());
+    // Note that delete_delta_3_3 should not be read, when a minor compacted
+    // [delete_]delta_2_5 is present.
+  }
+
+  @Test
+  public void testDeleteDeltaSubdirPathGeneration() throws Exception {
+    String deleteDeltaSubdirPath = AcidUtils.deleteDeltaSubdir(1, 10);
+    assertEquals("delete_delta_0000001_0000010", deleteDeltaSubdirPath);
+    deleteDeltaSubdirPath = AcidUtils.deleteDeltaSubdir(1, 10, 5);
+    assertEquals("delete_delta_0000001_0000010_0005", deleteDeltaSubdirPath);
+  }
+
+  @Test
+  public void testDeleteEventDeltaDirPathFilter() throws Exception {
+    Path positivePath = new Path("delete_delta_000001_000010");
+    Path negativePath = new Path("delta_000001_000010");
+    assertEquals(true, 
AcidUtils.deleteEventDeltaDirFilter.accept(positivePath));
+    assertEquals(false, 
AcidUtils.deleteEventDeltaDirFilter.accept(negativePath));
+  }
+
+  @Test
+  public void testAcidOperationalProperties() throws Exception {
+    AcidUtils.AcidOperationalProperties testObj = 
AcidUtils.AcidOperationalProperties.getLegacy();
+    assertsForAcidOperationalProperties(testObj, "legacy");
+
+    testObj = AcidUtils.AcidOperationalProperties.getDefault();
+    assertsForAcidOperationalProperties(testObj, "default");
+
+    testObj = AcidUtils.AcidOperationalProperties.parseInt(0);
+    assertsForAcidOperationalProperties(testObj, "legacy");
+
+    testObj = AcidUtils.AcidOperationalProperties.parseInt(1);
+    assertsForAcidOperationalProperties(testObj, "split_update");
+
+    testObj = AcidUtils.AcidOperationalProperties.parseString("legacy");
+    assertsForAcidOperationalProperties(testObj, "legacy");
+
+    testObj = AcidUtils.AcidOperationalProperties.parseString("default");
+    assertsForAcidOperationalProperties(testObj, "default");
+
+  }
+
+  private void 
assertsForAcidOperationalProperties(AcidUtils.AcidOperationalProperties testObj,
+      String type) throws Exception {
+    switch(type) {
+      case "split_update":
+      case "default":
+        assertEquals(true, testObj.isSplitUpdate());
+        assertEquals(false, testObj.isHashBasedMerge());
+        assertEquals(1, testObj.toInt());
+        assertEquals("|split_update", testObj.toString());
+        break;
+      case "legacy":
+        assertEquals(false, testObj.isSplitUpdate());
+        assertEquals(false, testObj.isHashBasedMerge());
+        assertEquals(0, testObj.toInt());
+        assertEquals("", testObj.toString());
+        break;
+      default:
+        break;
+    }
+  }
+
+  @Test
+  public void testAcidOperationalPropertiesSettersAndGetters() throws 
Exception {
+    AcidUtils.AcidOperationalProperties oprProps = 
AcidUtils.AcidOperationalProperties.getDefault();
+    Configuration testConf = new Configuration();
+    // Test setter for configuration object.
+    AcidUtils.setAcidOperationalProperties(testConf, oprProps);
+    assertEquals(1, 
testConf.getInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, 0));
+    // Test getter for configuration object.
+    assertEquals(oprProps.toString(), 
AcidUtils.getAcidOperationalProperties(testConf).toString());
+
+    Map<String, String> parameters = new HashMap<String, String>();
+    // Test setter for map object.
+    AcidUtils.setAcidOperationalProperties(parameters, oprProps);
+    assertEquals(oprProps.toString(),
+        
parameters.get(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname));
+    // Test getter for map object.
+    // Calling a get on the 'parameters' will still return legacy type because 
the setters/getters
+    // for map work on different string keys. The setter will set the 
HIVE_TXN_OPERATIONAL_PROPERTIES
+    // while the getter will try to read the key 
TABLE_TRANSACTIONAL_PROPERTIES.
+    assertEquals(0, 
AcidUtils.getAcidOperationalProperties(parameters).toInt());
+    parameters.put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, 
oprProps.toString());
+    // Set the appropriate key in the map and test that we are able to read it 
back correctly.
+    assertEquals(1, 
AcidUtils.getAcidOperationalProperties(parameters).toInt());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java 
b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index 6648829..2c1bb6f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -520,7 +520,9 @@ public class TestInputOutputFormat {
               conf, n);
           OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(
               context, fs, new MockPath(fs, "mock:/a/b"), false, null);
-          final SplitStrategy splitStrategy = createSplitStrategy(context, 
gen);
+          List<SplitStrategy<?>> splitStrategies = 
createSplitStrategies(context, gen);
+          assertEquals(1, splitStrategies.size());
+          final SplitStrategy splitStrategy = splitStrategies.get(0);
           assertTrue(
               String.format(
                   "Split strategy for %d files x %d size for %d splits", c, s,
@@ -541,7 +543,9 @@ public class TestInputOutputFormat {
               conf, n);
           OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(
               context, fs, new MockPath(fs, "mock:/a/b"), false, null);
-          final SplitStrategy splitStrategy = createSplitStrategy(context, 
gen);
+          List<SplitStrategy<?>> splitStrategies = 
createSplitStrategies(context, gen);
+          assertEquals(1, splitStrategies.size());
+          final SplitStrategy splitStrategy = splitStrategies.get(0);
           assertTrue(
               String.format(
                   "Split strategy for %d files x %d size for %d splits", c, s,
@@ -565,8 +569,9 @@ public class TestInputOutputFormat {
     OrcInputFormat.FileGenerator gen =
       new OrcInputFormat.FileGenerator(context, fs,
           new MockPath(fs, "mock:/a/b"), false, null);
-    OrcInputFormat.SplitStrategy splitStrategy = createSplitStrategy(context, 
gen);
-    assertEquals(true, splitStrategy instanceof 
OrcInputFormat.BISplitStrategy);
+    List<OrcInputFormat.SplitStrategy<?>> splitStrategies = 
createSplitStrategies(context, gen);
+    assertEquals(1, splitStrategies.size());
+    assertEquals(true, splitStrategies.get(0) instanceof 
OrcInputFormat.BISplitStrategy);
 
     conf.set("mapreduce.input.fileinputformat.split.maxsize", "500");
     context = new OrcInputFormat.Context(conf);
@@ -578,8 +583,9 @@ public class TestInputOutputFormat {
         new MockFile("mock:/a/b/part-04", 1000, new byte[1000]));
     gen = new OrcInputFormat.FileGenerator(context, fs,
             new MockPath(fs, "mock:/a/b"), false, null);
-    splitStrategy = createSplitStrategy(context, gen);
-    assertEquals(true, splitStrategy instanceof 
OrcInputFormat.ETLSplitStrategy);
+    splitStrategies = createSplitStrategies(context, gen);
+    assertEquals(1, splitStrategies.size());
+    assertEquals(true, splitStrategies.get(0) instanceof 
OrcInputFormat.ETLSplitStrategy);
   }
 
   @Test
@@ -594,9 +600,9 @@ public class TestInputOutputFormat {
     OrcInputFormat.FileGenerator gen =
         new OrcInputFormat.FileGenerator(context, fs,
             new MockPath(fs, "mock:/a"), false, null);
-    OrcInputFormat.SplitStrategy splitStrategy = createSplitStrategy(context, 
gen);
-    assertEquals(true, splitStrategy instanceof 
OrcInputFormat.ACIDSplitStrategy);
-    List<OrcSplit> splits = splitStrategy.getSplits();
+    List<OrcInputFormat.SplitStrategy<?>> splitStrategies = 
createSplitStrategies(context, gen);
+    assertEquals(true, splitStrategies.get(0) instanceof 
OrcInputFormat.ACIDSplitStrategy);
+    List<OrcSplit> splits = 
((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
     ColumnarSplitSizeEstimator splitSizeEstimator = new 
ColumnarSplitSizeEstimator();
     for (OrcSplit split: splits) {
       assertEquals(Integer.MAX_VALUE, 
splitSizeEstimator.getEstimatedSize(split));
@@ -605,6 +611,127 @@ public class TestInputOutputFormat {
   }
 
   @Test
+  public void testACIDSplitStrategyForSplitUpdate() throws Exception {
+    conf.set("bucket_count", "2");
+    conf.set(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true");
+    conf.set(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, 
"default");
+    OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
+
+    // Case 1: Test with just originals => Single split strategy with two 
splits.
+    MockFileSystem fs = new MockFileSystem(conf,
+        new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new 
MockBlock("host1")),
+        new MockFile("mock:/a/b/000000_1", 1000, new byte[1], new 
MockBlock("host1")));
+    OrcInputFormat.FileGenerator gen =
+        new OrcInputFormat.FileGenerator(context, fs,
+            new MockPath(fs, "mock:/a"), false, null);
+    List<OrcInputFormat.SplitStrategy<?>> splitStrategies = 
createSplitStrategies(context, gen);
+    assertEquals(1, splitStrategies.size());
+    assertEquals(true, splitStrategies.get(0) instanceof 
OrcInputFormat.ACIDSplitStrategy);
+    List<OrcSplit> splits = 
((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
+    assertEquals(2, splits.size());
+    assertEquals("mock:/a/b/000000_0", 
splits.get(0).getPath().toUri().toString());
+    assertEquals("mock:/a/b/000000_1", 
splits.get(1).getPath().toUri().toString());
+    assertTrue(splits.get(0).isOriginal());
+    assertTrue(splits.get(1).isOriginal());
+
+    // Case 2: Test with originals and base => Single split strategy with two 
splits on compacted
+    // base since the presence of a base will make the originals obsolete.
+    fs = new MockFileSystem(conf,
+        new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new 
MockBlock("host1")),
+        new MockFile("mock:/a/b/000000_1", 1000, new byte[1], new 
MockBlock("host1")),
+        new MockFile("mock:/a/base_0000001/bucket_00000", 1000, new byte[1], 
new MockBlock("host1")),
+        new MockFile("mock:/a/base_0000001/bucket_00001", 1000, new byte[1], 
new MockBlock("host1")));
+    gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, 
"mock:/a"), false, null);
+    splitStrategies = createSplitStrategies(context, gen);
+    assertEquals(1, splitStrategies.size());
+    assertEquals(true, splitStrategies.get(0) instanceof 
OrcInputFormat.ACIDSplitStrategy);
+    splits = 
((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
+    assertEquals(2, splits.size());
+    assertEquals("mock:/a/base_0000001/bucket_00000", 
splits.get(0).getPath().toUri().toString());
+    assertEquals("mock:/a/base_0000001/bucket_00001", 
splits.get(1).getPath().toUri().toString());
+    assertFalse(splits.get(0).isOriginal());
+    assertFalse(splits.get(1).isOriginal());
+
+    // Case 3: Test with originals and deltas => Two split strategies with two 
splits for each.
+    fs = new MockFileSystem(conf,
+        new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new 
MockBlock("host1")),
+        new MockFile("mock:/a/b/000000_1", 1000, new byte[1], new 
MockBlock("host1")),
+        new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00000", 1000, 
new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00001", 1000, 
new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/delete_delta_0000001_0000001_0000/bucket_00000", 
1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/delete_delta_0000001_0000001_0000/bucket_00001", 
1000, new byte[1], new MockBlock("host1")));
+    gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, 
"mock:/a"), false, null);
+    splitStrategies = createSplitStrategies(context, gen);
+    assertEquals(2, splitStrategies.size());
+    assertEquals(true, splitStrategies.get(0) instanceof 
OrcInputFormat.ACIDSplitStrategy);
+    splits = 
((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
+    assertEquals(2, splits.size());
+    assertEquals("mock:/a/b/000000_0", 
splits.get(0).getPath().toUri().toString());
+    assertEquals("mock:/a/b/000000_1", 
splits.get(1).getPath().toUri().toString());
+    assertTrue(splits.get(0).isOriginal());
+    assertTrue(splits.get(1).isOriginal());
+    assertEquals(true, splitStrategies.get(1) instanceof 
OrcInputFormat.ACIDSplitStrategy);
+    splits = 
((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(1)).getSplits();
+    assertEquals(2, splits.size());
+    assertEquals("mock:/a/delta_0000001_0000001_0000/bucket_00000", 
splits.get(0).getPath().toUri().toString());
+    assertEquals("mock:/a/delta_0000001_0000001_0000/bucket_00001", 
splits.get(1).getPath().toUri().toString());
+    assertFalse(splits.get(0).isOriginal());
+    assertFalse(splits.get(1).isOriginal());
+
+    // Case 4: Test with originals and deltas but now with only one bucket 
covered, i.e. we will
+    // have originals & insert_deltas for only one bucket, but the 
delete_deltas will be for two
+    // buckets => Two strategies with one split for each.
+    // When split-update is enabled, we do not need to account for buckets 
that aren't covered.
+    // The reason why we are able to do so is because the valid user data has 
already been considered
+    // as base for the covered buckets. Hence, the uncovered buckets do not 
have any relevant
+    // data and we can just ignore them.
+    fs = new MockFileSystem(conf,
+        new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new 
MockBlock("host1")),
+        new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00000", 1000, 
new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/delete_delta_0000001_0000001_0000/bucket_00000", 
1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/delete_delta_0000001_0000001_0000/bucket_00001", 
1000, new byte[1], new MockBlock("host1")));
+    gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, 
"mock:/a"), false, null);
+    splitStrategies = createSplitStrategies(context, gen);
+    assertEquals(2, splitStrategies.size());
+    assertEquals(true, splitStrategies.get(0) instanceof 
OrcInputFormat.ACIDSplitStrategy);
+    splits = 
((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
+    assertEquals(1, splits.size());
+    assertEquals("mock:/a/b/000000_0", 
splits.get(0).getPath().toUri().toString());
+    assertTrue(splits.get(0).isOriginal());
+    assertEquals(true, splitStrategies.get(1) instanceof 
OrcInputFormat.ACIDSplitStrategy);
+    splits = 
((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(1)).getSplits();
+    assertEquals(1, splits.size());
+    assertEquals("mock:/a/delta_0000001_0000001_0000/bucket_00000", 
splits.get(0).getPath().toUri().toString());
+    assertFalse(splits.get(0).isOriginal());
+
+    // Case 5: Test with originals, compacted_base, insert_deltas, 
delete_deltas (exhaustive test)
+    // This should just generate one strategy with splits for base and 
insert_deltas.
+    fs = new MockFileSystem(conf,
+        new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new 
MockBlock("host1")),
+        new MockFile("mock:/a/b/000000_1", 1000, new byte[1], new 
MockBlock("host1")),
+        new MockFile("mock:/a/base_0000001/bucket_00000", 1000, new byte[1], 
new MockBlock("host1")),
+        new MockFile("mock:/a/base_0000001/bucket_00001", 1000, new byte[1], 
new MockBlock("host1")),
+        new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00000", 1000, 
new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00001", 1000, 
new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/delete_delta_0000002_0000002_0000/bucket_00000", 
1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/delete_delta_0000002_0000002_0000/bucket_00001", 
1000, new byte[1], new MockBlock("host1")));
+    gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, 
"mock:/a"), false, null);
+    splitStrategies = createSplitStrategies(context, gen);
+    assertEquals(1, splitStrategies.size());
+    assertEquals(true, splitStrategies.get(0) instanceof 
OrcInputFormat.ACIDSplitStrategy);
+    splits = 
((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
+    assertEquals(4, splits.size());
+    assertEquals("mock:/a/base_0000001/bucket_00000", 
splits.get(0).getPath().toUri().toString());
+    assertEquals("mock:/a/base_0000001/bucket_00001", 
splits.get(1).getPath().toUri().toString());
+    assertEquals("mock:/a/delta_0000002_0000002_0000/bucket_00000", 
splits.get(2).getPath().toUri().toString());
+    assertEquals("mock:/a/delta_0000002_0000002_0000/bucket_00001", 
splits.get(3).getPath().toUri().toString());
+    assertFalse(splits.get(0).isOriginal());
+    assertFalse(splits.get(1).isOriginal());
+    assertFalse(splits.get(2).isOriginal());
+    assertFalse(splits.get(3).isOriginal());
+  }
+
+  @Test
   public void testBIStrategySplitBlockBoundary() throws Exception {
     conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
     OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
@@ -617,9 +744,10 @@ public class TestInputOutputFormat {
     OrcInputFormat.FileGenerator gen =
         new OrcInputFormat.FileGenerator(context, fs,
             new MockPath(fs, "mock:/a/b"), false, null);
-    OrcInputFormat.SplitStrategy splitStrategy = createSplitStrategy(context, 
gen);
-    assertEquals(true, splitStrategy instanceof 
OrcInputFormat.BISplitStrategy);
-    List<OrcSplit> splits = splitStrategy.getSplits();
+    List<OrcInputFormat.SplitStrategy<?>> splitStrategies = 
createSplitStrategies(context, gen);
+    assertEquals(1, splitStrategies.size());
+    assertEquals(true, splitStrategies.get(0) instanceof 
OrcInputFormat.BISplitStrategy);
+    List<OrcSplit> splits = 
((OrcInputFormat.BISplitStrategy)splitStrategies.get(0)).getSplits();
     int numSplits = splits.size();
     assertEquals(5, numSplits);
 
@@ -632,9 +760,10 @@ public class TestInputOutputFormat {
         new MockFile("mock:/a/b/part-04", 1000, new byte[1000], new 
MockBlock("host1", "host2")));
     gen = new OrcInputFormat.FileGenerator(context, fs,
         new MockPath(fs, "mock:/a/b"), false, null);
-    splitStrategy = createSplitStrategy(context, gen);
-    assertEquals(true, splitStrategy instanceof 
OrcInputFormat.BISplitStrategy);
-    splits = splitStrategy.getSplits();
+    splitStrategies = createSplitStrategies(context, gen);
+    assertEquals(1, splitStrategies.size());
+    assertEquals(true, splitStrategies.get(0) instanceof 
OrcInputFormat.BISplitStrategy);
+    splits = 
((OrcInputFormat.BISplitStrategy)splitStrategies.get(0)).getSplits();
     numSplits = splits.size();
     assertEquals(5, numSplits);
 
@@ -652,9 +781,10 @@ public class TestInputOutputFormat {
             new MockBlock("host1", "host2")));
     gen = new OrcInputFormat.FileGenerator(context, fs,
         new MockPath(fs, "mock:/a/b"), false, null);
-    splitStrategy = createSplitStrategy(context, gen);
-    assertEquals(true, splitStrategy instanceof 
OrcInputFormat.BISplitStrategy);
-    splits = splitStrategy.getSplits();
+    splitStrategies = createSplitStrategies(context, gen);
+    assertEquals(1, splitStrategies.size());
+    assertEquals(true, splitStrategies.get(0) instanceof 
OrcInputFormat.BISplitStrategy);
+    splits = 
((OrcInputFormat.BISplitStrategy)splitStrategies.get(0)).getSplits();
     numSplits = splits.size();
     assertEquals(10, numSplits);
 
@@ -672,9 +802,10 @@ public class TestInputOutputFormat {
             new MockBlock("host1", "host2")));
     gen = new OrcInputFormat.FileGenerator(context, fs,
         new MockPath(fs, "mock:/a/b"), false, null);
-    splitStrategy = createSplitStrategy(context, gen);
-    assertEquals(true, splitStrategy instanceof 
OrcInputFormat.BISplitStrategy);
-    splits = splitStrategy.getSplits();
+    splitStrategies = createSplitStrategies(context, gen);
+    assertEquals(1, splitStrategies.size());
+    assertEquals(true, splitStrategies.get(0) instanceof 
OrcInputFormat.BISplitStrategy);
+    splits = 
((OrcInputFormat.BISplitStrategy)splitStrategies.get(0)).getSplits();
     numSplits = splits.size();
     assertEquals(10, numSplits);
 
@@ -692,9 +823,10 @@ public class TestInputOutputFormat {
             new MockBlock("host1", "host2"), new MockBlock("host1", "host2")));
     gen = new OrcInputFormat.FileGenerator(context, fs,
         new MockPath(fs, "mock:/a/b"), false, null);
-    splitStrategy = createSplitStrategy(context, gen);
-    assertEquals(true, splitStrategy instanceof 
OrcInputFormat.BISplitStrategy);
-    splits = splitStrategy.getSplits();
+    splitStrategies = createSplitStrategies(context, gen);
+    assertEquals(1, splitStrategies.size());
+    assertEquals(true, splitStrategies.get(0) instanceof 
OrcInputFormat.BISplitStrategy);
+    splits = 
((OrcInputFormat.BISplitStrategy)splitStrategies.get(0)).getSplits();
     numSplits = splits.size();
     assertEquals(15, numSplits);
   }
@@ -717,22 +849,23 @@ public class TestInputOutputFormat {
 
     OrcInputFormat.CombinedCtx combineCtx = new OrcInputFormat.CombinedCtx();
     // The first directory becomes the base for combining.
-    SplitStrategy<?> ss = createOrCombineStrategy(context, fs, "mock:/a/1", 
combineCtx);
-    assertNull(ss);
+    List<SplitStrategy<?>> ss = createOrCombineStrategies(context, fs, 
"mock:/a/1", combineCtx);
+    assertTrue(ss.isEmpty());
     assertTrue(combineCtx.combined instanceof OrcInputFormat.ETLSplitStrategy);
     OrcInputFormat.ETLSplitStrategy etlSs = combineCtx.combined;
     assertEquals(2, etlSs.files.size());
     assertTrue(etlSs.isOriginal);
     assertEquals(1, etlSs.dirs.size());
     // The second one should be combined into the first.
-    ss = createOrCombineStrategy(context, fs, "mock:/a/2", combineCtx);
-    assertNull(ss);
+    ss = createOrCombineStrategies(context, fs, "mock:/a/2", combineCtx);
+    assertTrue(ss.isEmpty());
     assertTrue(combineCtx.combined instanceof OrcInputFormat.ETLSplitStrategy);
     assertEquals(4, etlSs.files.size());
     assertEquals(2, etlSs.dirs.size());
     // The third one has the base file, so it shouldn't be combined but could 
be a base.
-    ss = createOrCombineStrategy(context, fs, "mock:/a/3", combineCtx);
-    assertSame(etlSs, ss);
+    ss = createOrCombineStrategies(context, fs, "mock:/a/3", combineCtx);
+    assertEquals(1, ss.size());
+    assertSame(etlSs, ss.get(0));
     assertEquals(4, etlSs.files.size());
     assertEquals(2, etlSs.dirs.size());
     assertTrue(combineCtx.combined instanceof OrcInputFormat.ETLSplitStrategy);
@@ -741,34 +874,37 @@ public class TestInputOutputFormat {
     assertFalse(etlSs.isOriginal);
     assertEquals(1, etlSs.dirs.size());
     // Try the first again, it would not be combined and we'd retain the old 
base (less files).
-    ss = createOrCombineStrategy(context, fs, "mock:/a/1", combineCtx);
-    assertTrue(ss instanceof OrcInputFormat.ETLSplitStrategy);
-    assertNotSame(etlSs, ss);
-    OrcInputFormat.ETLSplitStrategy rejectedEtlSs = 
(OrcInputFormat.ETLSplitStrategy)ss;
+    ss = createOrCombineStrategies(context, fs, "mock:/a/1", combineCtx);
+    assertEquals(1, ss.size());
+    assertTrue(ss.get(0) instanceof OrcInputFormat.ETLSplitStrategy);
+    assertNotSame(etlSs, ss.get(0));
+    OrcInputFormat.ETLSplitStrategy rejectedEtlSs = 
(OrcInputFormat.ETLSplitStrategy)ss.get(0);
     assertEquals(2, rejectedEtlSs.files.size());
     assertEquals(1, rejectedEtlSs.dirs.size());
     assertTrue(rejectedEtlSs.isOriginal);
     assertEquals(1, etlSs.files.size());
     assertEquals(1, etlSs.dirs.size());
     // The fourth could be combined again.
-    ss = createOrCombineStrategy(context, fs, "mock:/a/4", combineCtx);
-    assertNull(ss);
+    ss = createOrCombineStrategies(context, fs, "mock:/a/4", combineCtx);
+    assertTrue(ss.isEmpty());
     assertTrue(combineCtx.combined instanceof OrcInputFormat.ETLSplitStrategy);
     assertEquals(2, etlSs.files.size());
     assertEquals(2, etlSs.dirs.size());
     // The fifth will not be combined because of delta files.
-    ss = createOrCombineStrategy(context, fs, "mock:/a/5", combineCtx);
-    assertTrue(ss instanceof OrcInputFormat.ETLSplitStrategy);
+    ss = createOrCombineStrategies(context, fs, "mock:/a/5", combineCtx);
+    assertEquals(1, ss.size());
+    assertTrue(ss.get(0) instanceof OrcInputFormat.ETLSplitStrategy);
     assertNotSame(etlSs, ss);
     assertEquals(2, etlSs.files.size());
     assertEquals(2, etlSs.dirs.size());
   }
 
-  public SplitStrategy<?> createOrCombineStrategy(OrcInputFormat.Context 
context,
+  public List<SplitStrategy<?>> 
createOrCombineStrategies(OrcInputFormat.Context context,
       MockFileSystem fs, String path, OrcInputFormat.CombinedCtx combineCtx) 
throws IOException {
     OrcInputFormat.AcidDirInfo adi = createAdi(context, fs, path);
-    return OrcInputFormat.determineSplitStrategy(combineCtx, context,
-        adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, null, 
null, true);
+    return OrcInputFormat.determineSplitStrategies(combineCtx, context,
+        adi.fs, adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas,
+        null, null, true);
   }
 
   public OrcInputFormat.AcidDirInfo createAdi(
@@ -777,11 +913,12 @@ public class TestInputOutputFormat {
         context, fs, new MockPath(fs, path), false, null).call();
   }
 
-  private OrcInputFormat.SplitStrategy createSplitStrategy(
+  private List<OrcInputFormat.SplitStrategy<?>> createSplitStrategies(
       OrcInputFormat.Context context, OrcInputFormat.FileGenerator gen) throws 
IOException {
     OrcInputFormat.AcidDirInfo adi = gen.call();
-    return OrcInputFormat.determineSplitStrategy(
-        null, context, adi.fs, adi.splitPath, adi.acidInfo, 
adi.baseOrOriginalFiles, null, null, true);
+    return OrcInputFormat.determineSplitStrategies(
+        null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseFiles, 
adi.parsedDeltas,
+        null, null, true);
   }
 
   public static class MockBlock {

Reply via email to