Repository: hive
Updated Branches:
  refs/heads/master f5b14fc04 -> ddf3b6cd0


http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java 
b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 77fe736..5e085f8 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.txn.AcidWriteSetService;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.TestTxnCommands2;
 import org.junit.After;
 import org.junit.Assert;
 import org.apache.hadoop.hive.common.FileUtils;
@@ -87,7 +88,7 @@ public class TestDbTxnManager2 {
   private static HiveConf conf = new HiveConf(Driver.class);
   private HiveTxnManager txnMgr;
   private Context ctx;
-  private Driver driver;
+  private Driver driver, driver2;
   private TxnStore txnHandler;
 
   public TestDbTxnManager2() throws Exception {
@@ -103,6 +104,7 @@ public class TestDbTxnManager2 {
     SessionState.start(conf);
     ctx = new Context(conf);
     driver = new Driver(new 
QueryState.Builder().withHiveConf(conf).nonIsolated().build(), null);
+    driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build(), 
null);
     TxnDbUtil.cleanDb(conf);
     TxnDbUtil.prepDb(conf);
     SessionState ss = SessionState.get();
@@ -115,6 +117,7 @@ public class TestDbTxnManager2 {
   @After
   public void tearDown() throws Exception {
     driver.close();
+    driver2.close();
     if (txnMgr != null) {
       txnMgr.closeTxnManager();
     }
@@ -548,10 +551,10 @@ public class TestDbTxnManager2 {
     checkCmdOnDriver(cpr);
     count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' 
and CQ_TYPE='i'");
     Assert.assertEquals(1, count);
-    org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf);
+    TestTxnCommands2.runWorker(conf);
     count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='r' 
and CQ_TYPE='i'");
     Assert.assertEquals(1, count);
-    org.apache.hadoop.hive.ql.TestTxnCommands2.runCleaner(conf);
+    TestTxnCommands2.runCleaner(conf);
     count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11'");
     Assert.assertEquals(0, count);
     count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11' and 
CC_STATE='s' and CC_TYPE='i'");
@@ -561,10 +564,10 @@ public class TestDbTxnManager2 {
     checkCmdOnDriver(cpr);
     count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and 
CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='i'");
     Assert.assertEquals(1, count);
-    org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf);
+    TestTxnCommands2.runWorker(conf);
     count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and 
CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='r' and CQ_TYPE='i'");
     Assert.assertEquals(1, count);
-    org.apache.hadoop.hive.ql.TestTxnCommands2.runCleaner(conf);
+    TestTxnCommands2.runCleaner(conf);
     count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p'");
     Assert.assertEquals(0, count);
     count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p' and 
CC_STATE='s' and CC_TYPE='i'");
@@ -576,7 +579,7 @@ public class TestDbTxnManager2 {
     checkCmdOnDriver(cpr);
     count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' 
and CQ_TYPE='a'");
     Assert.assertEquals(1, count);
-    org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); // will fail
+    TestTxnCommands2.runWorker(conf); // will fail
     count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' 
and CQ_TYPE='a'");
     Assert.assertEquals(0, count);
     count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11' and 
CC_STATE='f' and CC_TYPE='a'");
@@ -586,7 +589,7 @@ public class TestDbTxnManager2 {
     checkCmdOnDriver(cpr);
     count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and 
CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'");
     Assert.assertEquals(1, count);
-    org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); // will fail
+    TestTxnCommands2.runWorker(conf); // will fail
     count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and 
CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'");
     Assert.assertEquals(0, count);
     count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p' and 
CC_STATE='f' and CC_TYPE='a'");
@@ -824,7 +827,7 @@ public class TestDbTxnManager2 {
    * the TxnManager instance in the session (hacky but nothing is actually 
threading so it allows us
    * to write good tests)
    */
-  private static HiveTxnManager swapTxnManager(HiveTxnManager txnMgr) {
+  public static HiveTxnManager swapTxnManager(HiveTxnManager txnMgr) {
     return SessionState.get().setTxnMgr(txnMgr);
   }
   @Test

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 124c97e..cfd7290 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -359,7 +359,7 @@ public abstract class CompactorTest {
       }
       FSDataOutputStream out = fs.create(partFile);
       if (type == FileType.LENGTH_FILE) {
-        out.writeInt(numRecords);
+        out.writeInt(numRecords);//hmm - length files should store length in 
bytes...
       } else {
         for (int i = 0; i < numRecords; i++) {
           RecordIdentifier ri = new RecordIdentifier(maxTxn - 1, bucket, i);

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index ce574b4..467851a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -18,34 +18,20 @@
 package org.apache.hadoop.hive.ql.txn.compactor;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.CompactionRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
-import org.apache.hadoop.hive.metastore.api.DataOperationType;
-import org.apache.hadoop.hive.metastore.api.LockComponent;
-import org.apache.hadoop.hive.metastore.api.LockLevel;
-import org.apache.hadoop.hive.metastore.api.LockRequest;
-import org.apache.hadoop.hive.metastore.api.LockResponse;
-import org.apache.hadoop.hive.metastore.api.LockType;
-import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
-import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
-import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.UnlockRequest;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Tests for the compactor Cleaner thread
@@ -198,231 +184,6 @@ public class TestCleaner extends CompactorTest {
   }
 
   @Test
-  public void blockedByLockTable() throws Exception {
-    Table t = newTable("default", "bblt", false);
-
-    addBaseFile(t, null, 20L, 20);
-    addDeltaFile(t, null, 21L, 22L, 2);
-    addDeltaFile(t, null, 23L, 24L, 2);
-    addDeltaFile(t, null, 21L, 24L, 4);
-
-    burnThroughTransactions("default", "bblt", 25);
-
-    CompactionRequest rqst = new CompactionRequest("default", "bblt", 
CompactionType.MINOR);
-    txnHandler.compact(rqst);
-    CompactionInfo ci = txnHandler.findNextToCompact("fred");
-    txnHandler.markCompacted(ci);
-    txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
-
-    LockComponent comp = new LockComponent(LockType.SHARED_READ, 
LockLevel.TABLE, "default");
-    comp.setTablename("bblt");
-    comp.setOperationType(DataOperationType.SELECT);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-
-    startCleaner();
-
-    // Check there are no compactions requests left.
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
-    Assert.assertEquals("bblt", compacts.get(0).getTablename());
-    Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType());
-  }
-
-  @Test
-  public void blockedByLockPartition() throws Exception {
-    Table t = newTable("default", "bblp", true);
-    Partition p = newPartition(t, "today");
-
-    addBaseFile(t, p, 20L, 20);
-    addDeltaFile(t, p, 21L, 22L, 2);
-    addDeltaFile(t, p, 23L, 24L, 2);
-    addDeltaFile(t, p, 21L, 24L, 4);
-
-    burnThroughTransactions("default", "bblp", 25);
-
-    CompactionRequest rqst = new CompactionRequest("default", "bblp", 
CompactionType.MINOR);
-    rqst.setPartitionname("ds=today");
-    txnHandler.compact(rqst);
-    CompactionInfo ci = txnHandler.findNextToCompact("fred");
-    txnHandler.markCompacted(ci);
-    txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
-
-    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, 
LockLevel.PARTITION, "default");
-    comp.setTablename("bblp");
-    comp.setPartitionname("ds=today");
-    comp.setOperationType(DataOperationType.DELETE);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    OpenTxnsResponse resp = txnHandler.openTxns(new OpenTxnRequest(1, 
"Dracula", "Transylvania"));
-    req.setTxnid(resp.getTxn_ids().get(0));
-    LockResponse res = txnHandler.lock(req);
-
-    startCleaner();
-
-    // Check there are no compactions requests left.
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
-    Assert.assertEquals("bblp", compacts.get(0).getTablename());
-    Assert.assertEquals("ds=today", compacts.get(0).getPartitionname());
-    Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType());
-  }
-
-  @Test
-  public void notBlockedBySubsequentLock() throws Exception {
-    Table t = newTable("default", "bblt", false);
-
-    // Set the run frequency low on this test so it doesn't take long
-    conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, 100,
-        TimeUnit.MILLISECONDS);
-
-    addBaseFile(t, null, 20L, 20);
-    addDeltaFile(t, null, 21L, 22L, 2);
-    addDeltaFile(t, null, 23L, 24L, 2);
-    addDeltaFile(t, null, 21L, 24L, 4);
-
-    burnThroughTransactions("default", "bblt", 25);
-
-    CompactionRequest rqst = new CompactionRequest("default", "bblt", 
CompactionType.MINOR);
-    txnHandler.compact(rqst);
-    CompactionInfo ci = txnHandler.findNextToCompact("fred");
-    txnHandler.markCompacted(ci);
-    txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
-
-    LockComponent comp = new LockComponent(LockType.SHARED_READ, 
LockLevel.TABLE, "default");
-    comp.setTablename("bblt");
-    comp.setOperationType(DataOperationType.INSERT);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-
-    AtomicBoolean looped = new AtomicBoolean();
-    looped.set(false);
-    startCleaner(looped);
-
-    // Make sure the compactor has a chance to run once
-    while (!looped.get()) {
-      Thread.currentThread().sleep(100);
-    }
-
-    // There should still be one request, as the locks still held.
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-
-    // obtain a second lock.  This shouldn't block cleaner as it was acquired 
after the initial
-    // clean request
-    LockComponent comp2 = new LockComponent(LockType.SHARED_READ, 
LockLevel.TABLE, "default");
-    comp2.setTablename("bblt");
-    comp.setOperationType(DataOperationType.SELECT);
-    List<LockComponent> components2 = new ArrayList<LockComponent>(1);
-    components2.add(comp2);
-    LockRequest req2 = new LockRequest(components, "me", "localhost");
-    LockResponse res2 = txnHandler.lock(req2);
-
-    // Unlock the previous lock
-    txnHandler.unlock(new UnlockRequest(res.getLockid()));
-    looped.set(false);
-
-    while (!looped.get()) {
-      Thread.currentThread().sleep(100);
-    }
-    stopThread();
-    Thread.currentThread().sleep(200);
-
-
-    // Check there are no compactions requests left.
-    rsp = txnHandler.showCompact(new ShowCompactRequest());
-    compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    
Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
-  }
-
-  @Test
-  public void partitionNotBlockedBySubsequentLock() throws Exception {
-    Table t = newTable("default", "bblt", true);
-    Partition p = newPartition(t, "today");
-
-    // Set the run frequency low on this test so it doesn't take long
-    conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, 100,
-        TimeUnit.MILLISECONDS);
-
-    addBaseFile(t, p, 20L, 20);
-    addDeltaFile(t, p, 21L, 22L, 2);
-    addDeltaFile(t, p, 23L, 24L, 2);
-    addDeltaFile(t, p, 21L, 24L, 4);
-
-    burnThroughTransactions("default", "bblt", 25);
-
-    CompactionRequest rqst = new CompactionRequest("default", "bblt", 
CompactionType.MINOR);
-    rqst.setPartitionname("ds=today");
-    txnHandler.compact(rqst);
-    CompactionInfo ci = txnHandler.findNextToCompact("fred");
-    txnHandler.markCompacted(ci);
-    txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
-
-    LockComponent comp = new LockComponent(LockType.SHARED_READ, 
LockLevel.PARTITION, "default");
-    comp.setTablename("bblt");
-    comp.setPartitionname("ds=today");
-    comp.setOperationType(DataOperationType.INSERT);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-
-    AtomicBoolean looped = new AtomicBoolean();
-    looped.set(false);
-    startCleaner(looped);
-
-    // Make sure the compactor has a chance to run once
-    while (!looped.get()) {
-      Thread.currentThread().sleep(100);
-    }
-
-    // There should still be one request, as the locks still held.
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-
-
-    // obtain a second lock.  This shouldn't block cleaner as it was acquired 
after the initial
-    // clean request
-    LockComponent comp2 = new LockComponent(LockType.SHARED_READ, 
LockLevel.PARTITION, "default");
-    comp2.setTablename("bblt");
-    comp2.setPartitionname("ds=today");
-    comp.setOperationType(DataOperationType.SELECT);
-    List<LockComponent> components2 = new ArrayList<LockComponent>(1);
-    components2.add(comp2);
-    LockRequest req2 = new LockRequest(components, "me", "localhost");
-    LockResponse res2 = txnHandler.lock(req2);
-
-    // Unlock the previous lock
-    txnHandler.unlock(new UnlockRequest(res.getLockid()));
-    looped.set(false);
-
-    while (!looped.get()) {
-      Thread.currentThread().sleep(100);
-    }
-    stopThread();
-    Thread.currentThread().sleep(200);
-
-
-    // Check there are no compactions requests left.
-    rsp = txnHandler.showCompact(new ShowCompactRequest());
-    compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    
Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
-  }
-
-  @Test
   public void cleanupAfterMajorPartitionCompactionNoBase() throws Exception {
     Table t = newTable("default", "campcnb", true);
     Partition p = newPartition(t, "today");

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index 488cd90..d9e4468 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -300,7 +300,7 @@ public class TestWorker extends CompactorTest {
     // Find the new delta file and make sure it has the right contents
     boolean sawNewDelta = false;
     for (int i = 0; i < stat.length; i++) {
-      if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 
24))) {
+      if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24) 
+ "_v0000026")) {
         sawNewDelta = true;
         FileStatus[] buckets = fs.listStatus(stat[i].getPath(), 
AcidUtils.hiddenFileFilter);
         Assert.assertEquals(2, buckets.length);
@@ -309,7 +309,7 @@ public class TestWorker extends CompactorTest {
         Assert.assertEquals(104L, buckets[0].getLen());
         Assert.assertEquals(104L, buckets[1].getLen());
       }
-      if 
(stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 24))) {
+      if 
(stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 24) + 
"_v0000026")) {
         sawNewDelta = true;
         FileStatus[] buckets = fs.listStatus(stat[i].getPath(), 
AcidUtils.hiddenFileFilter);
         Assert.assertEquals(2, buckets.length);
@@ -322,7 +322,7 @@ public class TestWorker extends CompactorTest {
         LOG.debug("This is not the delta file you are looking for " + 
stat[i].getPath().getName());
       }
     }
-    Assert.assertTrue(sawNewDelta);
+    Assert.assertTrue(toString(stat), sawNewDelta);
   }
 
   /**
@@ -354,15 +354,22 @@ public class TestWorker extends CompactorTest {
     // There should still now be 5 directories in the location
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
-    Assert.assertEquals(5, stat.length);
+    Assert.assertEquals(toString(stat),6 , stat.length);
 
     // Find the new delta file and make sure it has the right contents
     Arrays.sort(stat);
     Assert.assertEquals("base_20", stat[0].getPath().getName());
-    Assert.assertEquals(makeDeleteDeltaDirNameCompacted(21, 22), 
stat[1].getPath().getName());
+    /**
+     * this may look a bit odd.  Compactor is capped at min open write id 
which is 23 in this case
+     * so the minor compaction above only 1 dir as input, delta_21_22 and 
outputs
+     * delta_21_22_v28 (and matching delete_delta)  (HIVE-9995/HIVE-20901)
+     */
+    Assert.assertEquals(makeDeleteDeltaDirNameCompacted(21, 22) + "_v0000028",
+        stat[1].getPath().getName());
     Assert.assertEquals(makeDeltaDirNameCompacted(21, 22), 
stat[2].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName());
+    Assert.assertEquals(makeDeltaDirNameCompacted(21, 22) + "_v0000028", 
stat[3].getPath().getName());
+    Assert.assertEquals(makeDeltaDirName(23, 25), stat[4].getPath().getName());
+    Assert.assertEquals(makeDeltaDirName(26, 27), stat[5].getPath().getName());
   }
 
   @Test
@@ -395,9 +402,9 @@ public class TestWorker extends CompactorTest {
     // Find the new delta file and make sure it has the right contents
     Arrays.sort(stat);
     Assert.assertEquals("base_20", stat[0].getPath().getName());
-    Assert.assertEquals(makeDeleteDeltaDirNameCompacted(21, 27), 
stat[1].getPath().getName());
+    Assert.assertEquals(makeDeleteDeltaDirNameCompacted(21, 27) + "_v0000028", 
stat[1].getPath().getName());
     Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName());
-    Assert.assertEquals(makeDeltaDirNameCompacted(21, 27), 
stat[3].getPath().getName());
+    Assert.assertEquals(makeDeltaDirNameCompacted(21, 27) + "_v0000028", 
stat[3].getPath().getName());
     Assert.assertEquals(makeDeltaDirName(23, 25), stat[4].getPath().getName());
     Assert.assertEquals(makeDeltaDirName(26, 27), stat[5].getPath().getName());
   }
@@ -432,7 +439,7 @@ public class TestWorker extends CompactorTest {
     // Find the new delta file and make sure it has the right contents
     boolean sawNewDelta = false;
     for (int i = 0; i < stat.length; i++) {
-      if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 
24))) {
+      if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24) 
+ "_v0000026")) {
         sawNewDelta = true;
         FileStatus[] buckets = fs.listStatus(stat[i].getPath(), 
AcidUtils.hiddenFileFilter);
         Assert.assertEquals(2, buckets.length);
@@ -453,7 +460,7 @@ public class TestWorker extends CompactorTest {
         LOG.debug("This is not the delta file you are looking for " + 
stat[i].getPath().getName());
       }
     }
-    Assert.assertTrue(sawNewDelta);
+    Assert.assertTrue(toString(stat), sawNewDelta);
   }
 
   @Test
@@ -484,7 +491,7 @@ public class TestWorker extends CompactorTest {
     // Find the new delta file and make sure it has the right contents
     boolean sawNewDelta = false;
     for (int i = 0; i < stat.length; i++) {
-      if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(1, 4))) 
{
+      if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(1, 4) + 
"_v0000006")) {
         sawNewDelta = true;
         FileStatus[] buckets = fs.listStatus(stat[i].getPath(), 
AcidUtils.hiddenFileFilter);
         Assert.assertEquals(2, buckets.length);
@@ -493,7 +500,7 @@ public class TestWorker extends CompactorTest {
         Assert.assertEquals(104L, buckets[0].getLen());
         Assert.assertEquals(104L, buckets[1].getLen());
       }
-      if 
(stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(1, 4))) {
+      if 
(stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(1, 4) + 
"_v0000006")) {
         sawNewDelta = true;
         FileStatus[] buckets = fs.listStatus(stat[i].getPath(), 
AcidUtils.hiddenFileFilter);
         Assert.assertEquals(2, buckets.length);
@@ -505,7 +512,7 @@ public class TestWorker extends CompactorTest {
         LOG.debug("This is not the delta file you are looking for " + 
stat[i].getPath().getName());
       }
     }
-    Assert.assertTrue(sawNewDelta);
+    Assert.assertTrue(toString(stat), sawNewDelta);
   }
 
   @Test
@@ -537,7 +544,7 @@ public class TestWorker extends CompactorTest {
     // Find the new delta file and make sure it has the right contents
     boolean sawNewBase = false;
     for (int i = 0; i < stat.length; i++) {
-      if (stat[i].getPath().getName().equals("base_0000024")) {
+      if (stat[i].getPath().getName().equals("base_0000024_v0000026")) {
         sawNewBase = true;
         FileStatus[] buckets = fs.listStatus(stat[i].getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
         Assert.assertEquals(2, buckets.length);
@@ -549,7 +556,7 @@ public class TestWorker extends CompactorTest {
         LOG.debug("This is not the file you are looking for " + 
stat[i].getPath().getName());
       }
     }
-    Assert.assertTrue(sawNewBase);
+    Assert.assertTrue(toString(stat), sawNewBase);
   }
 
   @Test
@@ -612,68 +619,37 @@ public class TestWorker extends CompactorTest {
     Assert.assertEquals(numFilesExpected, stat.length);
 
     // Find the new delta file and make sure it has the right contents
-    BitSet matchesFound = new BitSet(numFilesExpected);
-    for (int i = 0, j = 0; i < stat.length; i++) {
-      
if(stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21,23))) {
-        matchesFound.set(j++);
-      }
-      
if(stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(25,33))) {
-        matchesFound.set(j++);
-      }
-      if(stat[i].getPath().getName().equals(makeDeltaDirName(21,21))) {
-        matchesFound.set(j++);
-      }
-      else if(stat[i].getPath().getName().equals(makeDeltaDirName(23, 23))) {
-        matchesFound.set(j++);
-      }
-      else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(25, 
29))) {
-        matchesFound.set(j++);
-      }
-      else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(31, 
32))) {
-        matchesFound.set(j++);
-      }
-      else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(31, 
33))) {
-        matchesFound.set(j++);
-      }
-      else if(stat[i].getPath().getName().equals(makeDeltaDirName(35, 35))) {
-        matchesFound.set(j++);
-      }
-      else 
if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21,23))) {
-        matchesFound.set(j++);
-      }
-      else 
if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(25,33))) {
-        matchesFound.set(j++);
-      }
-      switch (type) {
-        case MINOR:
-          
if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21,35))) {
-            matchesFound.set(j++);
-          }
-          if 
(stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 35))) {
-            matchesFound.set(j++);
-          }
-          break;
-        case MAJOR:
-          if(stat[i].getPath().getName().equals(AcidUtils.baseDir(35))) {
-            matchesFound.set(j++);
-          }
-          break;
-        default:
-          throw new IllegalStateException();
-      }
+    List<String> matchesNotFound = new ArrayList<>(numFilesExpected);
+    matchesNotFound.add(makeDeleteDeltaDirNameCompacted(21,23) + "_v\\d+");
+    matchesNotFound.add(makeDeleteDeltaDirNameCompacted(25,33) + "_v\\d+");
+    matchesNotFound.add(makeDeltaDirName(21,21));
+    matchesNotFound.add(makeDeltaDirName(23, 23));
+    matchesNotFound.add(makeDeltaDirNameCompacted(25, 29));//streaming ingest
+    matchesNotFound.add(makeDeltaDirNameCompacted(31, 32));//streaming ingest
+    //todo: this should have some _vXXXX suffix but addDeltaFile() doesn't 
support it
+    matchesNotFound.add(makeDeltaDirNameCompacted(31, 33));
+    matchesNotFound.add(makeDeltaDirName(35, 35));
+    matchesNotFound.add(makeDeltaDirNameCompacted(21,23) + "_v\\d+");
+    matchesNotFound.add(makeDeltaDirNameCompacted(25,33) + "_v\\d+");
+    if(type == CompactionType.MINOR) {
+      matchesNotFound.add(makeDeltaDirNameCompacted(21,35) + "_v\\d+");
+      matchesNotFound.add(makeDeleteDeltaDirNameCompacted(21, 35) + "_v\\d+");
+    }
+    if(type == CompactionType.MAJOR) {
+      matchesNotFound.add(AcidUtils.baseDir(35) + "_v\\d+");
     }
-    StringBuilder sb = null;
-    for(int i = 0; i < stat.length; i++) {
-      if(!matchesFound.get(i)) {
-        if(sb == null) {
-          sb = new StringBuilder("Some files are missing at index: ");
+    for(FileStatus f : stat) {
+      for(int j = 0; j < matchesNotFound.size(); j++) {
+        if (f.getPath().getName().matches(matchesNotFound.get(j))) {
+          matchesNotFound.remove(j);
+          break;
         }
-        sb.append(i).append(",");
       }
     }
-    if (sb != null) {
-      Assert.assertTrue(sb.toString(), false);
+    if(matchesNotFound.size() == 0) {
+      return;
     }
+    Assert.assertTrue("Files remaining: " + matchesNotFound + "; " + 
toString(stat), false);
   }
   @Test
   public void majorPartitionWithBase() throws Exception {
@@ -706,7 +682,7 @@ public class TestWorker extends CompactorTest {
     // Find the new delta file and make sure it has the right contents
     boolean sawNewBase = false;
     for (int i = 0; i < stat.length; i++) {
-      if (stat[i].getPath().getName().equals("base_0000024")) {
+      if (stat[i].getPath().getName().equals("base_0000024_v0000026")) {
         sawNewBase = true;
         FileStatus[] buckets = fs.listStatus(stat[i].getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
         Assert.assertEquals(2, buckets.length);
@@ -718,7 +694,7 @@ public class TestWorker extends CompactorTest {
         LOG.debug("This is not the file you are looking for " + 
stat[i].getPath().getName());
       }
     }
-    Assert.assertTrue(sawNewBase);
+    Assert.assertTrue(toString(stat), sawNewBase);
   }
 
   @Test
@@ -749,7 +725,7 @@ public class TestWorker extends CompactorTest {
     // Find the new delta file and make sure it has the right contents
     boolean sawNewBase = false;
     for (int i = 0; i < stat.length; i++) {
-      if (stat[i].getPath().getName().equals("base_0000004")) {
+      if (stat[i].getPath().getName().equals("base_0000004_v0000005")) {
         sawNewBase = true;
         FileStatus[] buckets = fs.listStatus(stat[i].getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
         Assert.assertEquals(2, buckets.length);
@@ -761,9 +737,20 @@ public class TestWorker extends CompactorTest {
         LOG.debug("This is not the file you are looking for " + 
stat[i].getPath().getName());
       }
     }
-    Assert.assertTrue(sawNewBase);
+    Assert.assertTrue(toString(stat), sawNewBase);
   }
 
+  private static String toString(FileStatus[] stat) {
+    StringBuilder sb = new StringBuilder("stat{");
+    if(stat == null) {
+      return sb.toString();
+    }
+    for(FileStatus f : stat) {
+      sb.append(f.getPath()).append(",");
+    }
+    sb.setCharAt(sb.length() - 1, '}');
+    return sb.toString();
+  }
   @Test
   public void majorTableLegacy() throws Exception {
     LOG.debug("Starting majorTableLegacy");
@@ -793,7 +780,7 @@ public class TestWorker extends CompactorTest {
     // Find the new delta file and make sure it has the right contents
     boolean sawNewBase = false;
     for (int i = 0; i < stat.length; i++) {
-      if (stat[i].getPath().getName().equals("base_0000024")) {
+      if (stat[i].getPath().getName().equals("base_0000024_v0000026")) {
         sawNewBase = true;
         FileStatus[] buckets = fs.listStatus(stat[i].getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
         Assert.assertEquals(2, buckets.length);
@@ -805,7 +792,7 @@ public class TestWorker extends CompactorTest {
         LOG.debug("This is not the file you are looking for " + 
stat[i].getPath().getName());
       }
     }
-    Assert.assertTrue(sawNewBase);
+    Assert.assertTrue(toString(stat), sawNewBase);
   }
 
   @Test
@@ -836,7 +823,7 @@ public class TestWorker extends CompactorTest {
     // Find the new delta file and make sure it has the right contents
     boolean sawNewDelta = false;
     for (int i = 0; i < stat.length; i++) {
-      if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 
24))) {
+      if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24) 
+ "_v0000026")) {
         sawNewDelta = true;
         FileStatus[] buckets = fs.listStatus(stat[i].getPath(), 
AcidUtils.hiddenFileFilter);
         Assert.assertEquals(2, buckets.length);
@@ -846,7 +833,7 @@ public class TestWorker extends CompactorTest {
         LOG.debug("This is not the file you are looking for " + 
stat[i].getPath().getName());
       }
     }
-    Assert.assertTrue(sawNewDelta);
+    Assert.assertTrue(toString(stat), sawNewDelta);
   }
 
   @Test
@@ -881,7 +868,7 @@ public class TestWorker extends CompactorTest {
     // Find the new delta file and make sure it has the right contents
     boolean sawNewBase = false;
     for (int i = 0; i < stat.length; i++) {
-      if (stat[i].getPath().getName().equals("base_0000026")) {
+      if (stat[i].getPath().getName().equals("base_0000026_v0000028")) {
         sawNewBase = true;
         FileStatus[] buckets = fs.listStatus(stat[i].getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
         Assert.assertEquals(2, buckets.length);
@@ -901,7 +888,7 @@ public class TestWorker extends CompactorTest {
         LOG.debug("This is not the file you are looking for " + 
stat[i].getPath().getName());
       }
     }
-    Assert.assertTrue(sawNewBase);
+    Assert.assertTrue(toString(stat), sawNewBase);
   }
 
   @Test
@@ -933,7 +920,7 @@ public class TestWorker extends CompactorTest {
 
     // Find the new delta file and make sure it has the right contents
     Arrays.sort(stat);
-    Assert.assertEquals("base_0000022", stat[0].getPath().getName());
+    Assert.assertEquals("base_0000022_v0000028", stat[0].getPath().getName());
     Assert.assertEquals("base_20", stat[1].getPath().getName());
     Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName());
     Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName());
@@ -969,7 +956,7 @@ public class TestWorker extends CompactorTest {
 
     // Find the new delta file and make sure it has the right contents
     Arrays.sort(stat);
-    Assert.assertEquals("base_0000027", stat[0].getPath().getName());
+    Assert.assertEquals("base_0000027_v0000028", stat[0].getPath().getName());
     Assert.assertEquals("base_20", stat[1].getPath().getName());
     Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName());
     Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName());

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnCommonUtils.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnCommonUtils.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnCommonUtils.java
index c61a997..43cc805 100644
--- 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnCommonUtils.java
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnCommonUtils.java
@@ -89,7 +89,7 @@ public class TxnCommonUtils {
    * {@link org.apache.hadoop.hive.common.ValidTxnWriteIdList}.  This assumes 
that the caller intends to
    * read the files, and thus treats both open and aborted transactions as 
invalid.
    * @param currentTxnId current txn ID for which we get the valid write ids 
list
-   * @param list valid write ids list from the metastore
+   * @param validIds valid write ids list from the metastore
    * @return a valid write IDs list for the whole transaction.
    */
   public static ValidTxnWriteIdList createValidTxnWriteIdList(Long 
currentTxnId,

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index cbb76d5..7202cc6 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -17,10 +17,10 @@
  */
 package org.apache.hadoop.hive.metastore.txn;
 
-import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.classification.RetrySemantics;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
 import org.apache.hadoop.util.StringUtils;
@@ -324,6 +324,57 @@ class CompactionTxnHandler extends TxnHandler {
       return findReadyToClean();
     }
   }
+  @Override
+  public long findMinOpenTxnId() throws MetaException {
+    Connection dbConn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        return findMinOpenTxnGLB(stmt);
+      } catch (SQLException e) {
+        LOG.error("Unable to findMinOpenTxnId() due to:" + e.getMessage());
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "findMinOpenTxnId");
+        throw new MetaException("Unable to execute findMinOpenTxnId() " +
+            StringUtils.stringifyException(e));
+      } finally {
+        close(rs, stmt, dbConn);
+      }
+    } catch (RetryException e) {
+      return findMinOpenTxnId();
+    }
+  }
+
+  /**
+   * See doc at {@link TxnStore#findMinOpenTxnId()}
+   * Note that {@link #openTxns(OpenTxnRequest)} makes update of NEXT_TXN and 
MIN_HISTORY_LEVEL
+   * a single atomic operation (and no one else should update these tables 
except the cleaner
+   * which deletes rows from MIN_HISTORY_LEVEL which can only allow minOpenTxn 
to move higher)
+   */
+  private long findMinOpenTxnGLB(Statement stmt) throws MetaException, 
SQLException {
+    String s = "select ntxn_next from NEXT_TXN_ID";
+    LOG.debug("Going to execute query <" + s + ">");
+    ResultSet rs = stmt.executeQuery(s);
+    if (!rs.next()) {
+      throw new MetaException("Transaction tables not properly " +
+          "initialized, no record found in next_txn_id");
+    }
+    long hwm = rs.getLong(1);
+    s = "select min(mhl_min_open_txnid) from MIN_HISTORY_LEVEL";
+    LOG.debug("Going to execute query <" + s + ">");
+    rs = stmt.executeQuery(s);
+    rs.next();
+    long minOpenTxnId = rs.getLong(1);
+    if(rs.wasNull()) {
+      return hwm;
+    }
+    //since generating new txnid uses select for update on single row in 
NEXT_TXN_ID
+    assert hwm >= minOpenTxnId : "(hwm, minOpenTxnId)=(" + hwm + "," + 
minOpenTxnId + ")";
+    return minOpenTxnId;
+  }
 
   /**
    * This will remove an entry from the queue after
@@ -387,11 +438,16 @@ class CompactionTxnHandler extends TxnHandler {
           pStmt.setLong(paramCount++, info.highestWriteId);
         }
         LOG.debug("Going to execute update <" + s + ">");
-        if (pStmt.executeUpdate() < 1) {
+        if ((updCount = pStmt.executeUpdate()) < 1) {
           LOG.error("Expected to remove at least one row from 
completed_txn_components when " +
             "marking compaction entry as clean!");
         }
-
+        /**
+         * compaction may remove data from aborted txns above tc_writeid bit 
it only guarantees to
+         * remove it up to (inclusive) tc_writeid, so it's critical to not 
remove metadata about
+         * aborted TXN_COMPONENTS above tc_writeid (and consequently about 
aborted txns).
+         * See {@link ql.txn.compactor.Cleaner.removeFiles()}
+         */
         s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = 
tc_txnid and txn_state = '" +
           TXN_ABORTED + "' and tc_database = ? and tc_table = ?";
         if (info.highestWriteId != 0) s += " and tc_writeid <= ?";
@@ -480,7 +536,6 @@ class CompactionTxnHandler extends TxnHandler {
       markCleaned(info);
     }
   }
-
   /**
    * Clean up entries from TXN_TO_WRITE_ID table less than 
min_uncommited_txnid as found by
    * min(NEXT_TXN_ID.ntxn_next, min(MIN_HISTORY_LEVEL.mhl_min_open_txnid), 
min(Aborted TXNS.txn_id)).
@@ -502,30 +557,11 @@ class CompactionTxnHandler extends TxnHandler {
         // First need to find the min_uncommitted_txnid which is currently 
seen by any open transactions.
         // If there are no txns which are currently open or aborted in the 
system, then current value of
         // NEXT_TXN_ID.ntxn_next could be min_uncommitted_txnid.
-        String s = "select ntxn_next from NEXT_TXN_ID";
-        LOG.debug("Going to execute query <" + s + ">");
-        rs = stmt.executeQuery(s);
-        if (!rs.next()) {
-          throw new MetaException("Transaction tables not properly " +
-                  "initialized, no record found in next_txn_id");
-        }
-        long minUncommittedTxnId = rs.getLong(1);
-
-        // If there are any open txns, then the minimum of min_open_txnid from 
MIN_HISTORY_LEVEL table
-        // could be the min_uncommitted_txnid if lesser than 
NEXT_TXN_ID.ntxn_next.
-        s = "select min(mhl_min_open_txnid) from MIN_HISTORY_LEVEL";
-        LOG.debug("Going to execute query <" + s + ">");
-        rs = stmt.executeQuery(s);
-        if (rs.next()) {
-          long minOpenTxnId = rs.getLong(1);
-          if (minOpenTxnId > 0) {
-            minUncommittedTxnId = Math.min(minOpenTxnId, minUncommittedTxnId);
-          }
-        }
+        long minUncommittedTxnId = findMinOpenTxnGLB(stmt);
 
         // If there are aborted txns, then the minimum aborted txnid could be 
the min_uncommitted_txnid
         // if lesser than both NEXT_TXN_ID.ntxn_next and min(MIN_HISTORY_LEVEL 
.mhl_min_open_txnid).
-        s = "select min(txn_id) from TXNS where txn_state = " + 
quoteChar(TXN_ABORTED);
+        String s = "select min(txn_id) from TXNS where txn_state = " + 
quoteChar(TXN_ABORTED);
         LOG.debug("Going to execute query <" + s + ">");
         rs = stmt.executeQuery(s);
         if (rs.next()) {
@@ -534,7 +570,6 @@ class CompactionTxnHandler extends TxnHandler {
             minUncommittedTxnId = Math.min(minAbortedTxnId, 
minUncommittedTxnId);
           }
         }
-
         // As all txns below min_uncommitted_txnid are either committed or 
empty_aborted, we are allowed
         // to cleanup the entries less than min_uncommitted_txnid from the 
TXN_TO_WRITE_ID table.
         s = "delete from TXN_TO_WRITE_ID where t2w_txnid < " + 
minUncommittedTxnId;

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 22ce007..ca47a44 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -357,6 +357,16 @@ public interface TxnStore extends Configurable {
   List<CompactionInfo> findReadyToClean() throws MetaException;
 
   /**
+   * Returns the smallest txnid that could be seen in open state across all 
active transactions in
+   * the system or {@code NEXT_TXN_ID.NTXN_NEXT} if there are no active 
transactions, i.e. the
+   * smallest txnid that can be seen as unresolved in the whole system.  Even 
if a transaction
+   * is opened concurrently with this call it cannot have an id less than what 
this method returns.
+   * @return transaction ID
+   */
+  @RetrySemantics.ReadOnly
+  long findMinOpenTxnId() throws MetaException;
+
+  /**
    * This will remove an entry from the queue after
    * it has been compacted.
    * 

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 3bb1f0c..cd77b4e 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -19,8 +19,10 @@ package org.apache.hadoop.hive.metastore.txn;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
-import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -39,12 +41,30 @@ import java.util.Map;
 public class TxnUtils {
   private static final Logger LOG = LoggerFactory.getLogger(TxnUtils.class);
 
-  // Transactional stats states
-  static final public char STAT_OPEN = 'o';
-  static final public char STAT_INVALID = 'i';
-  static final public char STAT_COMMITTED = 'c';
-  static final public char STAT_OBSOLETE = 's';
-
+  public static ValidTxnList createValidTxnListForCleaner(GetOpenTxnsResponse 
txns, long minOpenTxnGLB) {
+    long highWaterMark = minOpenTxnGLB - 1;
+    long[] abortedTxns = new long[txns.getOpen_txnsSize()];
+    BitSet abortedBits = BitSet.valueOf(txns.getAbortedBits());
+    int i = 0;
+    for(long txnId : txns.getOpen_txns()) {
+      if(txnId > highWaterMark) {
+        break;
+      }
+      if(abortedBits.get(i)) {
+        abortedTxns[i] = txnId;
+      }
+      else {
+        assert false : JavaUtils.txnIdToString(txnId) + " is open and <= hwm:" 
+ highWaterMark;
+      }
+      ++i;
+    }
+    abortedTxns = Arrays.copyOf(abortedTxns, i);
+    BitSet bitSet = new BitSet(abortedTxns.length);
+    bitSet.set(0, abortedTxns.length);
+    //add ValidCleanerTxnList? - could be problematic for all the places that 
read it from
+    // string as they'd have to know which object to instantiate
+    return new ValidReadTxnList(abortedTxns, bitSet, highWaterMark, 
Long.MAX_VALUE);
+  }
   /**
    * Transform a {@link 
org.apache.hadoop.hive.metastore.api.TableValidWriteIds} to a
    * {@link org.apache.hadoop.hive.common.ValidCompactorWriteIdList}.  This 
assumes that the caller intends to
@@ -83,17 +103,6 @@ public class TxnUtils {
     }
   }
 
-  public static ValidReaderWriteIdList 
updateForCompactionQuery(ValidReaderWriteIdList ids) {
-    // This is based on the existing valid write ID list that was built for a 
select query;
-    // therefore we assume all the aborted txns, etc. were already accounted 
for.
-    // All we do is adjust the high watermark to only include contiguous txns.
-    Long minOpenWriteId = ids.getMinOpenWriteId();
-    if (minOpenWriteId != null && minOpenWriteId != Long.MAX_VALUE) {
-      return ids.updateHighWatermark(ids.getMinOpenWriteId() - 1);
-    }
-    return ids;
-  }
-
   /**
    * Get an instance of the TxnStore that is appropriate for this store
    * @param conf configuration

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
index 2c9d98b..2a70ec3 100644
--- 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
@@ -187,63 +187,6 @@ public class TestHiveMetaStoreTxns {
   }
 
   @Test
-  public void testTxnRange() throws Exception {
-    ValidTxnList validTxns = client.getValidTxns();
-    Assert.assertEquals(ValidTxnList.RangeResponse.NONE,
-        validTxns.isTxnRangeValid(1L, 3L));
-    List<Long> tids = client.openTxns("me", 5).getTxn_ids();
-
-    HeartbeatTxnRangeResponse rsp = client.heartbeatTxnRange(1, 5);
-    Assert.assertEquals(0, rsp.getNosuch().size());
-    Assert.assertEquals(0, rsp.getAborted().size());
-
-    client.rollbackTxn(1L);
-    client.commitTxn(2L);
-    client.commitTxn(3L);
-    client.commitTxn(4L);
-    validTxns = client.getValidTxns();
-    System.out.println("validTxns = " + validTxns);
-    Assert.assertEquals(ValidTxnList.RangeResponse.ALL,
-        validTxns.isTxnRangeValid(2L, 2L));
-    Assert.assertEquals(ValidTxnList.RangeResponse.ALL,
-        validTxns.isTxnRangeValid(2L, 3L));
-    Assert.assertEquals(ValidTxnList.RangeResponse.ALL,
-        validTxns.isTxnRangeValid(2L, 4L));
-    Assert.assertEquals(ValidTxnList.RangeResponse.ALL,
-        validTxns.isTxnRangeValid(3L, 4L));
-
-    Assert.assertEquals(ValidTxnList.RangeResponse.SOME,
-        validTxns.isTxnRangeValid(1L, 4L));
-    Assert.assertEquals(ValidTxnList.RangeResponse.SOME,
-        validTxns.isTxnRangeValid(2L, 5L));
-    Assert.assertEquals(ValidTxnList.RangeResponse.SOME,
-        validTxns.isTxnRangeValid(1L, 2L));
-    Assert.assertEquals(ValidTxnList.RangeResponse.SOME,
-        validTxns.isTxnRangeValid(4L, 5L));
-
-    Assert.assertEquals(ValidTxnList.RangeResponse.NONE,
-        validTxns.isTxnRangeValid(1L, 1L));
-    Assert.assertEquals(ValidTxnList.RangeResponse.NONE,
-        validTxns.isTxnRangeValid(5L, 10L));
-
-    validTxns = new ValidReadTxnList("10:5:4,5,6:");
-    Assert.assertEquals(ValidTxnList.RangeResponse.NONE,
-        validTxns.isTxnRangeValid(4,6));
-    Assert.assertEquals(ValidTxnList.RangeResponse.ALL,
-        validTxns.isTxnRangeValid(7, 10));
-    Assert.assertEquals(ValidTxnList.RangeResponse.SOME,
-        validTxns.isTxnRangeValid(7, 11));
-    Assert.assertEquals(ValidTxnList.RangeResponse.SOME,
-        validTxns.isTxnRangeValid(3, 6));
-    Assert.assertEquals(ValidTxnList.RangeResponse.SOME,
-        validTxns.isTxnRangeValid(4, 7));
-    Assert.assertEquals(ValidTxnList.RangeResponse.SOME,
-        validTxns.isTxnRangeValid(1, 12));
-    Assert.assertEquals(ValidTxnList.RangeResponse.ALL,
-        validTxns.isTxnRangeValid(1, 3));
-  }
-
-  @Test
   public void testLocks() throws Exception {
     LockRequestBuilder rqstBuilder = new LockRequestBuilder();
     rqstBuilder.addLockComponent(new LockComponentBuilder()

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java
----------------------------------------------------------------------
diff --git 
a/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java
 
b/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java
index a849264..5d74241 100644
--- 
a/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java
+++ 
b/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java
@@ -80,6 +80,9 @@ public class ValidCompactorWriteIdList extends 
ValidReaderWriteIdList {
   /**
    * Returns org.apache.hadoop.hive.common.ValidWriteIdList.RangeResponse.ALL 
if all write ids in
    * the range are resolved and RangeResponse.NONE otherwise
+   * Streaming ingest may create something like delta_11_20.  Compactor cannot 
include such delta in
+   * compaction until all transactions that write to it terminate.  (Otherwise 
compactor
+   * will produce delta that doesn't satisfy (D1 intersect D2 is empty or D1 
intersect D2 = D2).
    */
   @Override
   public RangeResponse isWriteIdRangeValid(long minWriteId, long maxWriteId) {

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java
----------------------------------------------------------------------
diff --git 
a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java
 
b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java
index 95a0b56..bc8ac0d 100644
--- 
a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java
+++ 
b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java
@@ -35,6 +35,12 @@ public class ValidReaderWriteIdList implements 
ValidWriteIdList {
   private long minOpenWriteId = Long.MAX_VALUE;
   protected long highWatermark;
 
+  /**
+   * This seems like a bad c'tor.  It doesn't even have a table name in it and 
it's used every time
+   * ValidWriteIdList.VALID_WRITEIDS_KEY is not found in Configuration.
+   * But, if anything, that would indicate a bug if was done for an acid read 
since it
+   * considers everything valid - this should not be assumed.
+   */
   public ValidReaderWriteIdList() {
     this(null, new long[0], new BitSet(), Long.MAX_VALUE, Long.MAX_VALUE);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java 
b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
index 50433b6..2170178 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -34,6 +34,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -52,6 +53,8 @@ import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.Validator;
@@ -66,10 +69,12 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
 import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
 import org.apache.hadoop.hive.metastore.api.TxnInfo;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
+import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@@ -412,13 +417,13 @@ public class TestStreaming {
     rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from 
default.streamingnobuckets order by ROW__ID");
 
     Assert.assertTrue(rs.get(0), 
rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
-    Assert.assertTrue(rs.get(0), 
rs.get(0).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
+    Assert.assertTrue(rs.get(0), 
rs.get(0).endsWith("streamingnobuckets/base_0000005_v0000025/bucket_00000"));
     Assert.assertTrue(rs.get(1), 
rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
-    Assert.assertTrue(rs.get(1), 
rs.get(1).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
+    Assert.assertTrue(rs.get(1), 
rs.get(1).endsWith("streamingnobuckets/base_0000005_v0000025/bucket_00000"));
     Assert.assertTrue(rs.get(2), 
rs.get(2).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
-    Assert.assertTrue(rs.get(2), 
rs.get(2).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
+    Assert.assertTrue(rs.get(2), 
rs.get(2).endsWith("streamingnobuckets/base_0000005_v0000025/bucket_00000"));
     Assert.assertTrue(rs.get(3), 
rs.get(3).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t0\t0"));
-    Assert.assertTrue(rs.get(3), 
rs.get(3).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
+    Assert.assertTrue(rs.get(3), 
rs.get(3).endsWith("streamingnobuckets/base_0000005_v0000025/bucket_00000"));
   }
 
   @Test
@@ -762,17 +767,17 @@ public class TestStreaming {
     rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from 
default.streamingnobuckets order by ROW__ID");
 
     Assert.assertTrue(rs.get(0), 
rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
-    Assert.assertTrue(rs.get(0), 
rs.get(0).endsWith("streamingnobuckets/base_0000009/bucket_00000"));
+    Assert.assertTrue(rs.get(0), 
rs.get(0).endsWith("streamingnobuckets/base_0000009_v0000029/bucket_00000"));
     Assert.assertTrue(rs.get(1), 
rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
-    Assert.assertTrue(rs.get(1), 
rs.get(1).endsWith("streamingnobuckets/base_0000009/bucket_00000"));
+    Assert.assertTrue(rs.get(1), 
rs.get(1).endsWith("streamingnobuckets/base_0000009_v0000029/bucket_00000"));
     Assert.assertTrue(rs.get(2), 
rs.get(2).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
-    Assert.assertTrue(rs.get(2), 
rs.get(2).endsWith("streamingnobuckets/base_0000009/bucket_00000"));
+    Assert.assertTrue(rs.get(2), 
rs.get(2).endsWith("streamingnobuckets/base_0000009_v0000029/bucket_00000"));
     Assert.assertTrue(rs.get(3), 
rs.get(3).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\ta11\tb12"));
-    Assert.assertTrue(rs.get(3), 
rs.get(3).endsWith("streamingnobuckets/base_0000009/bucket_00000"));
+    Assert.assertTrue(rs.get(3), 
rs.get(3).endsWith("streamingnobuckets/base_0000009_v0000029/bucket_00000"));
     Assert.assertTrue(rs.get(4), 
rs.get(4).startsWith("{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\ta13\tb14"));
-    Assert.assertTrue(rs.get(4), 
rs.get(4).endsWith("streamingnobuckets/base_0000009/bucket_00000"));
+    Assert.assertTrue(rs.get(4), 
rs.get(4).endsWith("streamingnobuckets/base_0000009_v0000029/bucket_00000"));
     Assert.assertTrue(rs.get(5), 
rs.get(5).startsWith("{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t0\t0"));
-    Assert.assertTrue(rs.get(5), 
rs.get(5).endsWith("streamingnobuckets/base_0000009/bucket_00000"));
+    Assert.assertTrue(rs.get(5), 
rs.get(5).endsWith("streamingnobuckets/base_0000009_v0000029/bucket_00000"));
   }
 
   /**
@@ -940,7 +945,7 @@ public class TestStreaming {
   @Deprecated
   private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, 
int buckets, int numExpectedFiles,
     String... records) throws Exception {
-    ValidWriteIdList writeIds = 
msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName));
+    ValidWriteIdList writeIds = getTransactionContext(conf);
     AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, 
writeIds);
     Assert.assertEquals(0, dir.getObsolete().size());
     Assert.assertEquals(0, dir.getOriginalFiles().size());
@@ -973,7 +978,8 @@ public class TestStreaming {
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string");
     AcidUtils.setAcidOperationalProperties(job, true, null);
     job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
-    job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.toString());
+    job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.writeToString());
+    job.set(ValidTxnList.VALID_TXNS_KEY, 
conf.get(ValidTxnList.VALID_TXNS_KEY));
     InputSplit[] splits = inf.getSplits(job, buckets);
     Assert.assertEquals(numExpectedFiles, splits.length);
     org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
@@ -994,8 +1000,7 @@ public class TestStreaming {
    */
   private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, 
int numExpectedFiles,
     String validationQuery, boolean vectorize, String... records) throws 
Exception {
-    ValidWriteIdList txns = 
msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName));
-    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, 
txns);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, 
getTransactionContext(conf));
     Assert.assertEquals(0, dir.getObsolete().size());
     Assert.assertEquals(0, dir.getOriginalFiles().size());
     List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -1038,9 +1043,15 @@ public class TestStreaming {
     conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, 
isVectorizationEnabled);
   }
 
+  private ValidWriteIdList getTransactionContext(Configuration conf) throws 
Exception {
+    ValidTxnList validTxnList = msClient.getValidTxns();
+    conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+    List<TableValidWriteIds> v = msClient.getValidWriteIds(Collections
+        .singletonList(TableName.getDbTable(dbName, tblName)), 
validTxnList.writeToString());
+    return TxnCommonUtils.createValidReaderWriteIdList(v.get(0));
+  }
   private void checkNothingWritten(Path partitionPath) throws Exception {
-    ValidWriteIdList writeIds = 
msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName));
-    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, 
writeIds);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, 
getTransactionContext(conf));
     Assert.assertEquals(0, dir.getObsolete().size());
     Assert.assertEquals(0, dir.getOriginalFiles().size());
     List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -1937,8 +1948,7 @@ public class TestStreaming {
     /*now both batches have committed (but not closed) so we for each primary 
file we expect a side
     file to exist and indicate the true length of primary file*/
     FileSystem fs = partLoc.getFileSystem(conf);
-    AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf,
-      msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf, 
getTransactionContext(conf));
     for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
       for (FileStatus stat : fs.listStatus(pd.getPath(), 
AcidUtils.bucketFileFilter)) {
         Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
@@ -1963,7 +1973,7 @@ public class TestStreaming {
     //so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length.  
Furthermore, each bucket0
     //has now received more data(logically - it's buffered) but it is not yet 
committed.
     //lets check that side files exist, etc
-    dir = AcidUtils.getAcidState(partLoc, conf, 
msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)));
+    dir = AcidUtils.getAcidState(partLoc, conf, getTransactionContext(conf));
     for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
       for (FileStatus stat : fs.listStatus(pd.getPath(), 
AcidUtils.bucketFileFilter)) {
         Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());

Reply via email to