analyze the transactional anomalies described by P. Baillis et al.

Change-Id: I584f3b5f0cab9a9867f8c47a52acc2fc6df07645


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/cad28cc9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/cad28cc9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/cad28cc9

Branch: refs/heads/master
Commit: cad28cc9d4a1f9aaf03ec7584764dd385d02494a
Parents: bcdd3d1
Author: Igor Katkov <katk...@yahoo-inc.com>
Authored: Wed Apr 27 18:15:08 2016 -0700
Committer: Igor Katkov <katk...@yahoo-inc.com>
Committed: Thu Apr 28 17:07:23 2016 -0700

----------------------------------------------------------------------
 .../apache/omid/benchmarks/tso/RawTxRunner.java |   2 +-
 .../TestBaillisAnomaliesWithTXs.java            | 601 +++++++++++++++++++
 2 files changed, 602 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/cad28cc9/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java
----------------------------------------------------------------------
diff --git 
a/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java 
b/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java
index ef7bb06..f18fb56 100644
--- a/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java
+++ b/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java
@@ -163,7 +163,7 @@ class RawTxRunner implements Runnable {
 
     }
 
-    public void stop() {
+    void stop() {
         isRunning = false;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/cad28cc9/hbase-client/src/test/java/org/apache/omid/transaction/TestBaillisAnomaliesWithTXs.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/test/java/org/apache/omid/transaction/TestBaillisAnomaliesWithTXs.java
 
b/hbase-client/src/test/java/org/apache/omid/transaction/TestBaillisAnomaliesWithTXs.java
new file mode 100644
index 0000000..5bfc8d5
--- /dev/null
+++ 
b/hbase-client/src/test/java/org/apache/omid/transaction/TestBaillisAnomaliesWithTXs.java
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.testng.ITestContext;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.slf4j.LoggerFactory.getLogger;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.fail;
+
+/**
+ * These tests try to analyze the transactional anomalies described by P. 
Baillis et al. in
+ * http://arxiv.org/pdf/1302.0309.pdf
+ *
+ * These tests try to model what project Hermitage is trying to do to compare 
the behavior of different DBMSs on these
+ * anomalies depending on the different isolation levels they offer. For more 
info on the Hermitage project, please
+ * refer to: https://github.com/ept/hermitage
+ *
+ * Transactional histories have been translated to HBase from the ones done 
for Postgresql in the Hermitage project:
+ * https://github.com/ept/hermitage/blob/master/postgres.md
+ *
+ * The "repeatable read" Postgresql isolation level is equivalent to "snapshot 
isolation", so we include the experiments
+ * for that isolation level
+ *
+ * With HBase 0.98 interfaces is not possible to execute updates/deletes based 
on predicates so the examples here are
+ * not exactly the same as in Postgres
+ */
+@Test(groups = "sharedHBase")
+public class TestBaillisAnomaliesWithTXs extends OmidTestBase {
+
+    private static final Logger LOG = 
getLogger(TestBaillisAnomaliesWithTXs.class);
+    private static final String TEST_COLUMN = "baillis-col";
+
+
+    // Data used in the tests
+    private byte[] famName = Bytes.toBytes(TEST_FAMILY);
+    private byte[] colName = Bytes.toBytes(TEST_COLUMN);
+
+    private byte[] rowId1 = Bytes.toBytes("row1");
+    private byte[] rowId2 = Bytes.toBytes("row2");
+    private byte[] rowId3 = Bytes.toBytes("row3");
+
+    private byte[] dataValue1 = Bytes.toBytes(10);
+    private byte[] dataValue2 = Bytes.toBytes(20);
+    private byte[] dataValue3 = Bytes.toBytes(30);
+
+
+    @Test
+    public void 
testSIPreventsPredicateManyPrecedersForReadPredicates(ITestContext context) 
throws Exception {
+        // TX History for PMP for Read Predicate:
+        // begin; set transaction isolation level repeatable read; -- T1
+        // begin; set transaction isolation level repeatable read; -- T2
+        // select * from test where value = 30; -- T1. Returns nothing
+        // insert into test (id, value) values(3, 30); -- T2
+        // commit; -- T2
+        // select * from test where value % 3 = 0; -- T1. Still returns nothing
+        // commit; -- T1
+
+        // 0) Start transactions
+        TransactionManager tm = newTransactionManager(context);
+        TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+
+        Transaction tx1 = tm.begin();
+        Transaction tx2 = tm.begin();
+
+        // 1) select * from test where value = 30; -- T1. Returns nothing
+        Scan scan = new Scan();
+        Filter f = new SingleColumnValueFilter(famName, colName, 
CompareFilter.CompareOp.EQUAL, Bytes.toBytes(30));
+        scan.setFilter(f);
+        ResultScanner tx1Scanner = txTable.getScanner(tx1, scan);
+        assertNull(tx1Scanner.next());
+
+        // 2) insert into test (id, value) values(3, 30); -- T2
+        Put newRow = new Put(rowId3);
+        newRow.add(famName, colName, dataValue3);
+        txTable.put(tx2, newRow);
+
+        // 3) Commit TX 2
+        tm.commit(tx2);
+
+        // 4) select * from test where value % 3 = 0; -- T1. Still returns 
nothing
+        tx1Scanner = txTable.getScanner(tx1, scan);
+        assertNull(tx1Scanner.next());
+
+        // 5) Commit TX 1
+        tm.commit(tx1);
+    }
+
+    @Test
+    public void 
testSIPreventsPredicateManyPrecedersForWritePredicates(ITestContext context) 
throws Exception {
+        // TX History for PMP for Write Predicate:
+        // begin; set transaction isolation level repeatable read; -- T1
+        // begin; set transaction isolation level repeatable read; -- T2
+        // update test set value = value + 10; -- T1
+        // delete from test where value = 20; -- T2, BLOCKS
+        // commit; -- T1. T2 now prints out "ERROR: could not serialize access 
due to concurrent update"
+        // abort; -- T2. There's nothing else we can do, this transaction has 
failed
+
+        // 0) Start transactions
+        TransactionManager tm = newTransactionManager(context);
+        TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+        Transaction tx1 = tm.begin();
+        Transaction tx2 = tm.begin();
+
+        // 1) update test set value = value + 10; -- T1
+        Scan updateScan = new Scan();
+        ResultScanner tx1Scanner = txTable.getScanner(tx2, updateScan);
+        Result updateRes = tx1Scanner.next();
+        int count = 0;
+        while (updateRes != null) {
+            LOG.info("RESSS {}", updateRes);
+            Put row = new Put(updateRes.getRow());
+            int val = Bytes.toInt(updateRes.getValue(famName, colName));
+            LOG.info("Updating row id {} with value {}", 
Bytes.toString(updateRes.getRow()), val);
+            row.add(famName, colName, Bytes.toBytes(val + 10));
+            txTable.put(tx1, row);
+            updateRes = tx1Scanner.next();
+            count++;
+        }
+        assertEquals(count, 2);
+
+        // 2) delete from test where value = 20; -- T2, BLOCKS
+        Scan scan = new Scan();
+        Filter f = new SingleColumnValueFilter(famName, colName, 
CompareFilter.CompareOp.EQUAL, Bytes.toBytes(20));
+        scan.setFilter(f);
+        ResultScanner tx2Scanner = txTable.getScanner(tx2, scan);
+        // assertEquals(tx2Scanner.next(100).length, 1);
+        Result res = tx2Scanner.next();
+        int count20 = 0;
+        while (res != null) {
+            LOG.info("RESSS {}", res);
+            LOG.info("Deleting row id {} with value {}", 
Bytes.toString(res.getRow()),
+                     Bytes.toInt(res.getValue(famName, colName)));
+            Delete delete20 = new Delete(res.getRow());
+            txTable.delete(tx2, delete20);
+            res = tx2Scanner.next();
+            count20++;
+        }
+        assertEquals(count20, 1);
+        // 3) commit TX 1
+        tm.commit(tx1);
+
+        tx2Scanner = txTable.getScanner(tx2, scan);
+        assertNull(tx2Scanner.next());
+
+        // 4) commit TX 2 -> Should be rolled-back
+        try {
+            tm.commit(tx2);
+            fail();
+        } catch (RollbackException e) {
+            // Expected
+        }
+
+    }
+
+    @Test
+    public void testSIPreventsLostUpdates(ITestContext context) throws 
Exception {
+        // TX History for P4:
+        // begin; set transaction isolation level repeatable read; -- T1
+        // begin; set transaction isolation level repeatable read; -- T2
+        // select * from test where id = 1; -- T1
+        // select * from test where id = 1; -- T2
+        // update test set value = 11 where id = 1; -- T1
+        // update test set value = 11 where id = 1; -- T2, BLOCKS
+        // commit; -- T1. T2 now prints out "ERROR: could not serialize access 
due to concurrent update"
+        // abort;  -- T2. There's nothing else we can do, this transaction has 
failed
+
+        // 0) Start transactions
+        TransactionManager tm = newTransactionManager(context);
+        TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+        Transaction tx1 = tm.begin();
+        Transaction tx2 = tm.begin();
+
+        Scan scan = new Scan(rowId1, rowId1);
+        scan.addColumn(famName, colName);
+
+        // 1) select * from test where id = 1; -- T1
+        ResultScanner tx1Scanner = txTable.getScanner(tx1, scan);
+        Result res = tx1Scanner.next();
+        int count = 0;
+        while (res != null) {
+            LOG.info("RESSS {}", res);
+            LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
+                     Bytes.toString(res.getValue(famName, colName)));
+            assertEquals(res.getRow(), rowId1);
+            assertEquals(res.getValue(famName, colName), dataValue1);
+            res = tx1Scanner.next();
+            count++;
+        }
+        assertEquals(count, 1);
+
+        // 2) select * from test where id = 1; -- T2
+        ResultScanner tx2Scanner = txTable.getScanner(tx2, scan);
+        res = tx2Scanner.next();
+        count = 0;
+        while (res != null) {
+            LOG.info("RESSS {}", res);
+            LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
+                     Bytes.toString(res.getValue(famName, colName)));
+            assertEquals(res.getRow(), rowId1);
+            assertEquals(res.getValue(famName, colName), dataValue1);
+            res = tx2Scanner.next();
+            count++;
+        }
+        assertEquals(count, 1);
+
+        // 3) update test set value = 11 where id = 1; -- T1
+        Put updateRow1Tx1 = new Put(rowId1);
+        updateRow1Tx1.add(famName, colName, Bytes.toBytes("11"));
+        txTable.put(tx1, updateRow1Tx1);
+
+        // 4) update test set value = 11 where id = 1; -- T2
+        Put updateRow1Tx2 = new Put(rowId1);
+        updateRow1Tx2.add(famName, colName, Bytes.toBytes("11"));
+        txTable.put(tx2, updateRow1Tx2);
+
+        // 5) commit -- T1
+        tm.commit(tx1);
+
+        // 6) commit -- T2 --> should be rolled-back
+        try {
+            tm.commit(tx2);
+            fail();
+        } catch (RollbackException e) {
+            // Expected
+        }
+
+    }
+
+    @Test
+    public void testSIPreventsReadSkew(ITestContext context) throws Exception {
+        // TX History for G-single:
+        // begin; set transaction isolation level repeatable read; -- T1
+        // begin; set transaction isolation level repeatable read; -- T2
+        // select * from test where id = 1; -- T1. Shows 1 => 10
+        // select * from test where id = 1; -- T2
+        // select * from test where id = 2; -- T2
+        // update test set value = 12 where id = 1; -- T2
+        // update test set value = 18 where id = 2; -- T2
+        // commit; -- T2
+        // select * from test where id = 2; -- T1. Shows 2 => 20
+        // commit; -- T1
+
+        // 0) Start transactions
+        TransactionManager tm = newTransactionManager(context);
+        TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+        Transaction tx1 = tm.begin();
+        Transaction tx2 = tm.begin();
+
+        Scan rowId1Scan = new Scan(rowId1, rowId1);
+        rowId1Scan.addColumn(famName, colName);
+
+        // 1) select * from test where id = 1; -- T1. Shows 1 => 10
+        ResultScanner tx1Scanner = txTable.getScanner(tx1, rowId1Scan);
+        Result res = tx1Scanner.next();
+        int count = 0;
+        while (res != null) {
+            LOG.info("RESSS {}", res);
+            LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
+                     Bytes.toString(res.getValue(famName, colName)));
+            assertEquals(res.getRow(), rowId1);
+            assertEquals(res.getValue(famName, colName), dataValue1);
+            res = tx1Scanner.next();
+            count++;
+        }
+        assertEquals(count, 1);
+
+        // 2) select * from test where id = 1; -- T2
+        ResultScanner tx2Scanner = txTable.getScanner(tx2, rowId1Scan);
+        res = tx2Scanner.next();
+        count = 0;
+        while (res != null) {
+            LOG.info("RESSS {}", res);
+            LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
+                     Bytes.toString(res.getValue(famName, colName)));
+            assertEquals(res.getRow(), rowId1);
+            assertEquals(res.getValue(famName, colName), dataValue1);
+            res = tx2Scanner.next();
+            count++;
+        }
+
+        Scan rowId2Scan = new Scan(rowId2, rowId2);
+        rowId2Scan.addColumn(famName, colName);
+
+        // 3) select * from test where id = 2; -- T2
+        tx2Scanner = txTable.getScanner(tx2, rowId2Scan);
+        res = tx2Scanner.next();
+        count = 0;
+        while (res != null) {
+            LOG.info("RESSS {}", res);
+            LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
+                     Bytes.toString(res.getValue(famName, colName)));
+            assertEquals(res.getRow(), rowId2);
+            assertEquals(res.getValue(famName, colName), dataValue2);
+            res = tx2Scanner.next();
+            count++;
+        }
+
+        // 4) update test set value = 12 where id = 1; -- T2
+        Put updateRow1Tx2 = new Put(rowId1);
+        updateRow1Tx2.add(famName, colName, Bytes.toBytes("12"));
+        txTable.put(tx1, updateRow1Tx2);
+
+        // 5) update test set value = 18 where id = 1; -- T2
+        Put updateRow2Tx2 = new Put(rowId2);
+        updateRow2Tx2.add(famName, colName, Bytes.toBytes("18"));
+        txTable.put(tx2, updateRow2Tx2);
+
+        // 6) commit -- T2
+        tm.commit(tx2);
+
+        // 7) select * from test where id = 2; -- T1. Shows 2 => 20
+        tx1Scanner = txTable.getScanner(tx1, rowId2Scan);
+        res = tx1Scanner.next();
+        count = 0;
+        while (res != null) {
+            LOG.info("RESSS {}", res);
+            LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
+                     Bytes.toString(res.getValue(famName, colName)));
+            assertEquals(res.getRow(), rowId2);
+            assertEquals(res.getValue(famName, colName), dataValue2);
+            res = tx1Scanner.next();
+            count++;
+        }
+
+        // 8) commit -- T1
+        tm.commit(tx1);
+
+    }
+
+    @Test
+    public void testSIPreventsReadSkewUsingWritePredicate(ITestContext 
context) throws Exception {
+        // TX History for G-single:
+        // begin; set transaction isolation level repeatable read; -- T1
+        // begin; set transaction isolation level repeatable read; -- T2
+        // select * from test where id = 1; -- T1. Shows 1 => 10
+        // select * from test; -- T2
+        // update test set value = 12 where id = 1; -- T2
+        // update test set value = 18 where id = 2; -- T2
+        // commit; -- T2
+        // delete from test where value = 20; -- T1. Prints "ERROR: could not 
serialize access due to concurrent update"
+        // abort; -- T1. There's nothing else we can do, this transaction has 
failed
+
+        // 0) Start transactions
+        TransactionManager tm = newTransactionManager(context);
+        TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+        Transaction tx1 = tm.begin();
+        Transaction tx2 = tm.begin();
+
+        // 1) select * from test; -- T1
+        assertNumberOfRows(txTable, tx1, 2, new Scan());
+
+        // 2) select * from test; -- T2
+        assertNumberOfRows(txTable, tx2, 2, new Scan());
+
+        // 3) update test set value = 12 where id = 1; -- T2
+        // 4) update test set value = 18 where id = 2; -- T2
+        Put updateRow1Tx2 = new Put(rowId1);
+        updateRow1Tx2.add(famName, colName, Bytes.toBytes(12));
+        Put updateRow2Tx2 = new Put(rowId2);
+        updateRow2Tx2.add(famName, colName, Bytes.toBytes(18));
+        txTable.put(tx2, Arrays.asList(updateRow1Tx2, updateRow2Tx2));
+
+        // 5) commit; -- T2
+        tm.commit(tx2);
+
+        // 6) delete from test where value = 20; -- T1. Prints
+        // "ERROR: could not serialize access due to concurrent update"
+        Filter f = new SingleColumnValueFilter(famName, colName, 
CompareFilter.CompareOp.EQUAL, Bytes.toBytes(20));
+        Scan checkFor20 = new Scan();
+        checkFor20.setFilter(f);
+        ResultScanner checkFor20Scanner = txTable.getScanner(tx1, checkFor20);
+        Result res = checkFor20Scanner.next();
+        while (res != null) {
+            LOG.info("RESSS {}", res);
+            LOG.info("Deleting row id {} with value {}", 
Bytes.toString(res.getRow()), Bytes.toInt(res.getValue(famName, colName)));
+            Delete delete20 = new Delete(res.getRow());
+            txTable.delete(tx1, delete20);
+            res = checkFor20Scanner.next();
+        }
+
+        // 7) abort; -- T1
+        try {
+            tm.commit(tx1);
+            fail("Should be aborted");
+        } catch (RollbackException e) {
+            // Expected
+        }
+
+    }
+
+    // this test shows that Omid does not provide serilizable level of 
isolation other wise last commit would have failed
+    @Test
+    public void testSIDoesNotPreventWriteSkew(ITestContext context) throws 
Exception {
+        // TX History for G2-item:
+        // begin; set transaction isolation level repeatable read; -- T1
+        // begin; set transaction isolation level repeatable read; -- T2
+        // select * from test where id in (1,2); -- T1
+        // select * from test where id in (1,2); -- T2
+        // update test set value = 11 where id = 1; -- T1
+        // update test set value = 21 where id = 2; -- T2
+        // commit; -- T1
+        // commit; -- T2
+
+        // 0) Start transactions
+        TransactionManager tm = newTransactionManager(context);
+        TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+        Transaction tx1 = tm.begin();
+        Transaction tx2 = tm.begin();
+
+        Scan rowId12Scan = new Scan(rowId1, rowId3);
+        rowId12Scan.addColumn(famName, colName);
+
+        // 1) select * from test where id in (1,2); -- T1
+        ResultScanner tx1Scanner = txTable.getScanner(tx1, rowId12Scan);
+        Result res = tx1Scanner.next();
+        int count = 0;
+        while (res != null) {
+            LOG.info("RESSS {}", res);
+            LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()), 
Bytes.toInt(res.getValue(famName, colName)));
+            switch (count) {
+                case 0:
+                    assertEquals(res.getRow(), rowId1);
+                    assertEquals(res.getValue(famName, colName), dataValue1);
+                    break;
+                case 1:
+                    assertEquals(res.getRow(), rowId2);
+                    assertEquals(res.getValue(famName, colName), dataValue2);
+                    break;
+                default:
+                    fail();
+            }
+            res = tx1Scanner.next();
+            count++;
+        }
+        assertEquals(count, 2);
+
+        // 2) select * from test where id in (1,2); -- T2
+        ResultScanner tx2Scanner = txTable.getScanner(tx1, rowId12Scan);
+        res = tx2Scanner.next();
+        count = 0;
+        while (res != null) {
+            LOG.info("RESSS {}", res);
+            LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()), 
Bytes.toInt(res.getValue(famName, colName)));
+            switch (count) {
+                case 0:
+                    assertEquals(res.getRow(), rowId1);
+                    assertEquals(res.getValue(famName, colName), dataValue1);
+                    break;
+                case 1:
+                    assertEquals(res.getRow(), rowId2);
+                    assertEquals(res.getValue(famName, colName), dataValue2);
+                    break;
+                default:
+                    fail();
+            }
+            res = tx2Scanner.next();
+            count++;
+        }
+        assertEquals(count, 2);
+
+        // 3) update test set value = 11 where id = 1; -- T1
+        Put updateRow1Tx1 = new Put(rowId1);
+        updateRow1Tx1.add(famName, colName, Bytes.toBytes("11"));
+        txTable.put(tx1, updateRow1Tx1);
+
+        // 4) update test set value = 21 where id = 2; -- T2
+        Put updateRow2Tx2 = new Put(rowId2);
+        updateRow2Tx2.add(famName, colName, Bytes.toBytes("21"));
+        txTable.put(tx2, updateRow2Tx2);
+
+        // 5) commit; -- T1
+        tm.commit(tx1);
+
+        // 6) commit; -- T2
+        tm.commit(tx2);
+    }
+
+    // this test shows that Omid does not provide serilizable level of 
isolation other wise last commit would have failed
+    @Test
+    public void testSIDoesNotPreventAntiDependencyCycles(ITestContext context) 
throws Exception {
+        // TX History for G2:
+        // begin; set transaction isolation level repeatable read; -- T1
+        // begin; set transaction isolation level repeatable read; -- T2
+        // select * from test where value % 3 = 0; -- T1
+        // select * from test where value % 3 = 0; -- T2
+        // insert into test (id, value) values(3, 30); -- T1
+        // insert into test (id, value) values(4, 42); -- T2
+        // commit; -- T1
+        // commit; -- T2
+        // select * from test where value % 3 = 0; -- Either. Returns 3 => 30, 
4 => 42
+
+        // 0) Start transactions
+        TransactionManager tm = newTransactionManager(context);
+        TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+        Transaction tx1 = tm.begin();
+        Transaction tx2 = tm.begin();
+
+        Filter f = new SingleColumnValueFilter(famName, colName, 
CompareFilter.CompareOp.EQUAL, Bytes.toBytes("30"));
+        Scan value30 = new Scan();
+        value30.setFilter(f);
+        value30.addColumn(famName, colName);
+
+        // 1) select * from test where value % 3 = 0; -- T1
+        assertNumberOfRows(txTable, tx1, 0, value30);
+
+
+        // 2) select * from test where value % 3 = 0; -- T2
+        assertNumberOfRows(txTable, tx2, 0, value30);
+
+
+        // 3) insert into test (id, value) values(3, 30); -- T1
+        Put insertRow3Tx1 = new Put(rowId1);
+        insertRow3Tx1.add(famName, colName, Bytes.toBytes("30"));
+        txTable.put(tx1, insertRow3Tx1);
+
+        // 4) insert into test (id, value) values(4, 42); -- T2
+        Put updateRow4Tx2 = new Put(rowId2);
+        updateRow4Tx2.add(famName, colName, Bytes.toBytes("42"));
+        txTable.put(tx2, updateRow4Tx2);
+
+        // 5) commit; -- T1
+        tm.commit(tx1);
+
+        // 6) commit; -- T2
+        tm.commit(tx2);
+
+        // 7) select * from test where value % 3 = 0; -- Either. Returns 3 => 
30, 4 => 42
+    }
+
+    /**
+     * This translates the table initialization done in:
+     * https://github.com/ept/hermitage/blob/master/postgres.md
+     *
+     * create table test (id int primary key, value int);
+     * insert into test (id, value) values (1, 10), (2, 20);
+     */
+    @BeforeMethod(alwaysRun = true)
+    private void loadBaseDataOnTestTable(ITestContext context) throws 
Exception {
+
+        TransactionManager tm = newTransactionManager(context);
+        TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+
+        Transaction initializationTx = tm.begin();
+        Put row1 = new Put(rowId1);
+        row1.add(famName, colName, dataValue1);
+        txTable.put(initializationTx, row1);
+        Put row2 = new Put(rowId2);
+        row2.add(famName, colName, dataValue2);
+        txTable.put(initializationTx, row2);
+
+        tm.commit(initializationTx);
+    }
+
+
+    private void assertNumberOfRows(TTable txTable, Transaction tx2, int 
maxCount, Scan scan) throws IOException {
+        int count = 0;
+        ResultScanner tx2Scanner = txTable.getScanner(tx2, scan);
+        Result res = tx2Scanner.next();
+        while (res != null) {
+            LOG.info("RESSS {}", res);
+            LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()), 
Bytes.toInt(res.getValue(famName, colName)));
+            res = tx2Scanner.next();
+            count++;
+        }
+        assertEquals(count, maxCount);
+    }
+
+
+}

Reply via email to