http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/java/com/yahoo/omid/transaction/TestUpdateScan.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/com/yahoo/omid/transaction/TestUpdateScan.java b/hbase-client/src/test/java/com/yahoo/omid/transaction/TestUpdateScan.java deleted file mode 100644 index 8394a01..0000000 --- a/hbase-client/src/test/java/com/yahoo/omid/transaction/TestUpdateScan.java +++ /dev/null @@ -1,218 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.yahoo.omid.transaction; - -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; -import org.apache.hadoop.hbase.filter.CompareFilter; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.RowFilter; -import org.apache.hadoop.hbase.filter.WhileMatchFilter; -import org.apache.hadoop.hbase.util.Bytes; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; -import org.testng.ITestContext; -import org.testng.annotations.Test; - -import static org.testng.AssertJUnit.assertEquals; -import static org.testng.AssertJUnit.assertTrue; - -@Test(groups = "sharedHBase") -public class TestUpdateScan extends OmidTestBase { - private static final Logger LOG = LoggerFactory.getLogger(TestUpdateScan.class); - - private static final String TEST_COL = "value"; - private static final String TEST_COL_2 = "col_2"; - - @Test - public void testGet(ITestContext context) throws Exception { - try { - TransactionManager tm = newTransactionManager(context); - TTable table = new TTable(hbaseConf, TEST_TABLE); - Transaction t = tm.begin(); - int[] lInts = new int[]{100, 243, 2342, 22, 1, 5, 43, 56}; - for (int i = 0; i < lInts.length; i++) { - byte[] data = Bytes.toBytes(lInts[i]); - Put put = new Put(data); - put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data); - table.put(t, put); - } - int startKeyValue = lInts[3]; - int stopKeyValue = lInts[3]; - byte[] startKey = Bytes.toBytes(startKeyValue); - byte[] stopKey = Bytes.toBytes(stopKeyValue); - Get g = new Get(startKey); - Result r = table.get(t, g); - if (!r.isEmpty()) { - int tmp = Bytes.toInt(r.getValue(Bytes.toBytes(TEST_FAMILY), - Bytes.toBytes(TEST_COL))); - LOG.info("Result:" + tmp); - assertTrue("Bad value, should be " - + startKeyValue + " but is " + tmp - , tmp == startKeyValue); - } else { - Assert.fail("Bad result"); - } - tm.commit(t); - - Scan s = new Scan(startKey); - CompareFilter.CompareOp op = CompareFilter.CompareOp.LESS_OR_EQUAL; - RowFilter toFilter = new RowFilter(op, new BinaryPrefixComparator(stopKey)); - boolean startInclusive = true; - if (!startInclusive) { - FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL); - filters.addFilter(new RowFilter(CompareFilter.CompareOp.GREATER, - new BinaryPrefixComparator(startKey))); - filters.addFilter(new WhileMatchFilter(toFilter)); - s.setFilter(filters); - } else { - s.setFilter(new WhileMatchFilter(toFilter)); - } - t = tm.begin(); - ResultScanner res = table.getScanner(t, s); - Result rr; - int count = 0; - while ((rr = res.next()) != null) { - int iTmp = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY), - Bytes.toBytes(TEST_COL))); - LOG.info("Result: " + iTmp); - count++; - } - assertEquals("Count is wrong", 1, count); - LOG.info("Rows found " + count); - tm.commit(t); - table.close(); - } catch (Exception e) { - LOG.error("Exception in test", e); - } - } - - @Test - public void testScan(ITestContext context) throws Exception { - - try (TTable table = new TTable(hbaseConf, TEST_TABLE)) { - TransactionManager tm = newTransactionManager(context); - Transaction t = tm.begin(); - int[] lInts = new int[]{100, 243, 2342, 22, 1, 5, 43, 56}; - for (int lInt : lInts) { - byte[] data = Bytes.toBytes(lInt); - Put put = new Put(data); - put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data); - put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL_2), data); - table.put(t, put); - } - - Scan s = new Scan(); - // Adding two columns to the scanner should not throw a - // ConcurrentModificationException when getting the scanner - s.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL)); - s.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL_2)); - ResultScanner res = table.getScanner(t, s); - Result rr; - int count = 0; - while ((rr = res.next()) != null) { - int iTmp = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY), - Bytes.toBytes(TEST_COL))); - LOG.info("Result: " + iTmp); - count++; - } - assertTrue("Count should be " + lInts.length + " but is " + count, - count == lInts.length); - LOG.info("Rows found " + count); - - tm.commit(t); - - t = tm.begin(); - res = table.getScanner(t, s); - count = 0; - while ((rr = res.next()) != null) { - int iTmp = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY), - Bytes.toBytes(TEST_COL))); - LOG.info("Result: " + iTmp); - count++; - } - assertTrue("Count should be " + lInts.length + " but is " + count, - count == lInts.length); - LOG.info("Rows found " + count); - tm.commit(t); - } - - } - - - @Test - public void testScanUncommitted(ITestContext context) throws Exception { - try { - TransactionManager tm = newTransactionManager(context); - TTable table = new TTable(hbaseConf, TEST_TABLE); - Transaction t = tm.begin(); - int[] lIntsA = new int[]{100, 243, 2342, 22, 1, 5, 43, 56}; - for (int aLIntsA : lIntsA) { - byte[] data = Bytes.toBytes(aLIntsA); - Put put = new Put(data); - put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data); - table.put(t, put); - } - tm.commit(t); - - Transaction tu = tm.begin(); - int[] lIntsB = new int[]{105, 24, 4342, 32, 7, 3, 30, 40}; - for (int aLIntsB : lIntsB) { - byte[] data = Bytes.toBytes(aLIntsB); - Put put = new Put(data); - put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data); - table.put(tu, put); - } - - t = tm.begin(); - int[] lIntsC = new int[]{109, 224, 242, 2, 16, 59, 23, 26}; - for (int aLIntsC : lIntsC) { - byte[] data = Bytes.toBytes(aLIntsC); - Put put = new Put(data); - put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data); - table.put(t, put); - } - tm.commit(t); - - t = tm.begin(); - Scan s = new Scan(); - ResultScanner res = table.getScanner(t, s); - Result rr; - int count = 0; - - while ((rr = res.next()) != null) { - int iTmp = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY), - Bytes.toBytes(TEST_COL))); - LOG.info("Result: " + iTmp); - count++; - } - assertTrue("Count should be " + (lIntsA.length * lIntsC.length) + " but is " + count, - count == lIntsA.length + lIntsC.length); - LOG.info("Rows found " + count); - tm.commit(t); - table.close(); - } catch (Exception e) { - LOG.error("Exception in test", e); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java b/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java new file mode 100644 index 0000000..e4c4614 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java @@ -0,0 +1,257 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.transaction; + +import com.google.inject.Guice; +import com.google.inject.Injector; +import org.apache.omid.TestUtils; +import org.apache.omid.committable.CommitTable; +import org.apache.omid.committable.InMemoryCommitTable; +import org.apache.omid.committable.hbase.HBaseCommitTableConfig; +import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig; +import org.apache.omid.tools.hbase.OmidTableManager; +import org.apache.omid.tso.TSOMockModule; +import org.apache.omid.tso.TSOServer; +import org.apache.omid.tso.TSOServerConfig; +import org.apache.omid.tso.client.OmidClientConfiguration; +import org.apache.omid.tso.client.TSOClient; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.ITestContext; +import org.testng.annotations.AfterGroups; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeGroups; +import org.testng.annotations.BeforeMethod; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Method; + +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER; + +public abstract class OmidTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(OmidTestBase.class); + + static HBaseTestingUtility hBaseUtils; + private static MiniHBaseCluster hbaseCluster; + static Configuration hbaseConf; + + protected static final String TEST_TABLE = "test"; + protected static final String TEST_FAMILY = "data"; + static final String TEST_FAMILY2 = "data2"; + private HBaseCommitTableConfig hBaseCommitTableConfig; + + @BeforeMethod(alwaysRun = true) + public void beforeClass(Method method) throws Exception { + Thread.currentThread().setName("UnitTest-" + method.getName()); + } + + + @BeforeGroups(groups = "sharedHBase") + public void beforeGroups(ITestContext context) throws Exception { + // TSO Setup + TSOServerConfig tsoConfig = new TSOServerConfig(); + tsoConfig.setPort(1234); + tsoConfig.setMaxItems(1000); + Injector injector = Guice.createInjector(new TSOMockModule(tsoConfig)); + LOG.info("Starting TSO"); + TSOServer tso = injector.getInstance(TSOServer.class); + hBaseCommitTableConfig = injector.getInstance(HBaseCommitTableConfig.class); + HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class); + tso.startAndWait(); + TestUtils.waitForSocketListening("localhost", 1234, 100); + LOG.info("Finished loading TSO"); + context.setAttribute("tso", tso); + + OmidClientConfiguration clientConf = new OmidClientConfiguration(); + clientConf.setConnectionString("localhost:1234"); + context.setAttribute("clientConf", clientConf); + + InMemoryCommitTable commitTable = (InMemoryCommitTable) injector.getInstance(CommitTable.class); + context.setAttribute("commitTable", commitTable); + + // Create the associated Handler + TSOClient client = TSOClient.newInstance(clientConf); + context.setAttribute("client", client); + + // ------------------------------------------------------------------------------------------------------------ + // HBase setup + // ------------------------------------------------------------------------------------------------------------ + LOG.info("Creating HBase minicluster"); + hbaseConf = HBaseConfiguration.create(); + hbaseConf.setInt("hbase.hregion.memstore.flush.size", 10_000 * 1024); + hbaseConf.setInt("hbase.regionserver.nbreservationblocks", 1); + hbaseConf.setInt(HBASE_CLIENT_RETRIES_NUMBER, 3); + + File tempFile = File.createTempFile("OmidTest", ""); + tempFile.deleteOnExit(); + hbaseConf.set("hbase.rootdir", tempFile.getAbsolutePath()); + + hBaseUtils = new HBaseTestingUtility(hbaseConf); + hbaseCluster = hBaseUtils.startMiniCluster(1); + hBaseUtils.createTable(Bytes.toBytes(hBaseTimestampStorageConfig.getTableName()), + new byte[][]{hBaseTimestampStorageConfig.getFamilyName().getBytes()}, + Integer.MAX_VALUE); + + createTestTable(); + createCommitTable(); + + LOG.info("HBase minicluster is up"); + } + + private void createTestTable() throws IOException { + HBaseAdmin admin = hBaseUtils.getHBaseAdmin(); + HTableDescriptor test_table_desc = new HTableDescriptor(TableName.valueOf(TEST_TABLE)); + HColumnDescriptor datafam = new HColumnDescriptor(TEST_FAMILY); + HColumnDescriptor datafam2 = new HColumnDescriptor(TEST_FAMILY2); + datafam.setMaxVersions(Integer.MAX_VALUE); + datafam2.setMaxVersions(Integer.MAX_VALUE); + test_table_desc.addFamily(datafam); + test_table_desc.addFamily(datafam2); + admin.createTable(test_table_desc); + } + + private void createCommitTable() throws IOException { + String[] args = new String[]{OmidTableManager.COMMIT_TABLE_COMMAND_NAME, "-numRegions", "1"}; + OmidTableManager omidTableManager = new OmidTableManager(args); + omidTableManager.executeActionsOnHBase(hbaseConf); + } + + + private TSOServer getTSO(ITestContext context) { + return (TSOServer) context.getAttribute("tso"); + } + + + TSOClient getClient(ITestContext context) { + return (TSOClient) context.getAttribute("client"); + } + + InMemoryCommitTable getCommitTable(ITestContext context) { + return (InMemoryCommitTable) context.getAttribute("commitTable"); + } + + protected TransactionManager newTransactionManager(ITestContext context) throws Exception { + return newTransactionManager(context, getClient(context)); + } + + protected TransactionManager newTransactionManager(ITestContext context, PostCommitActions postCommitActions) throws Exception { + HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration(); + clientConf.setConnectionString("localhost:1234"); + clientConf.setHBaseConfiguration(hbaseConf); + return HBaseTransactionManager.builder(clientConf) + .postCommitter(postCommitActions) + .commitTableClient(getCommitTable(context).getClient()) + .tsoClient(getClient(context)).build(); + } + + protected TransactionManager newTransactionManager(ITestContext context, TSOClient tsoClient) throws Exception { + HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration(); + clientConf.setConnectionString("localhost:1234"); + clientConf.setHBaseConfiguration(hbaseConf); + return HBaseTransactionManager.builder(clientConf) + .commitTableClient(getCommitTable(context).getClient()) + .tsoClient(tsoClient).build(); + } + + protected TransactionManager newTransactionManager(ITestContext context, CommitTable.Client commitTableClient) + throws Exception { + HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration(); + clientConf.setConnectionString("localhost:1234"); + clientConf.setHBaseConfiguration(hbaseConf); + return HBaseTransactionManager.builder(clientConf) + .commitTableClient(commitTableClient) + .tsoClient(getClient(context)).build(); + } + + @AfterGroups(groups = "sharedHBase") + public void afterGroups(ITestContext context) throws Exception { + LOG.info("Tearing down OmidTestBase..."); + if (hbaseCluster != null) { + hBaseUtils.shutdownMiniCluster(); + } + + getClient(context).close().get(); + getTSO(context).stopAndWait(); + TestUtils.waitForSocketNotListening("localhost", 1234, 1000); + } + + @AfterMethod(groups = "sharedHBase", timeOut = 60_000) + public void afterMethod() { + try { + LOG.info("tearing Down"); + HBaseAdmin admin = hBaseUtils.getHBaseAdmin(); + deleteTable(admin, TableName.valueOf(TEST_TABLE)); + createTestTable(); + deleteTable(admin, TableName.valueOf(hBaseCommitTableConfig.getTableName())); + createCommitTable(); + } catch (Exception e) { + LOG.error("Error tearing down", e); + } + } + + void deleteTable(HBaseAdmin admin, TableName tableName) throws IOException { + if (admin.tableExists(tableName)) { + if (admin.isTableDisabled(tableName)) { + admin.deleteTable(tableName); + } else { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + } + } + + static boolean verifyValue(byte[] tableName, byte[] row, + byte[] fam, byte[] col, byte[] value) { + + try (HTable table = new HTable(hbaseConf, tableName)) { + Get g = new Get(row).setMaxVersions(1); + Result r = table.get(g); + Cell cell = r.getColumnLatestCell(fam, col); + + if (LOG.isTraceEnabled()) { + LOG.trace("Value for " + Bytes.toString(tableName) + ":" + + Bytes.toString(row) + ":" + Bytes.toString(fam) + + Bytes.toString(col) + "=>" + Bytes.toString(CellUtil.cloneValue(cell)) + + " (" + Bytes.toString(value) + " expected)"); + } + + return Bytes.equals(CellUtil.cloneValue(cell), value); + } catch (IOException e) { + LOG.error("Error reading row " + Bytes.toString(tableName) + ":" + + Bytes.toString(row) + ":" + Bytes.toString(fam) + + Bytes.toString(col), e); + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/java/org/apache/omid/transaction/TestAsynchronousPostCommitter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestAsynchronousPostCommitter.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestAsynchronousPostCommitter.java new file mode 100644 index 0000000..21abacf --- /dev/null +++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestAsynchronousPostCommitter.java @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.transaction; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.omid.committable.CommitTable; +import org.apache.omid.metrics.NullMetricsProvider; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.ITestContext; +import org.testng.annotations.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +@Test(groups = "sharedHBase") +public class TestAsynchronousPostCommitter extends OmidTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(TestAsynchronousPostCommitter.class); + + private static final byte[] family = Bytes.toBytes(TEST_FAMILY); + private static final byte[] nonExistentFamily = Bytes.toBytes("non-existent"); + private static final byte[] qualifier = Bytes.toBytes("test-qual"); + + byte[] row1 = Bytes.toBytes("test-is-committed1"); + byte[] row2 = Bytes.toBytes("test-is-committed2"); + + @Test(timeOut = 30_000) + public void testPostCommitActionsAreCalledAsynchronously(ITestContext context) throws Exception { + + CommitTable.Client commitTableClient = getCommitTable(context).getClient(); + + PostCommitActions syncPostCommitter = + spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient)); + ListeningExecutorService postCommitExecutor = + MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat("postCommit-%d").build())); + PostCommitActions asyncPostCommitter = new HBaseAsyncPostCommitter(syncPostCommitter, postCommitExecutor); + + TransactionManager tm = newTransactionManager(context, asyncPostCommitter); + + final CountDownLatch beforeUpdatingShadowCellsLatch = new CountDownLatch(1); + final CountDownLatch afterUpdatingShadowCellsLatch = new CountDownLatch(1); + final CountDownLatch beforeRemovingCTEntryLatch = new CountDownLatch(1); + final CountDownLatch afterRemovingCTEntryLatch = new CountDownLatch(1); + + doAnswer(new Answer<ListenableFuture<Void>>() { + public ListenableFuture<Void> answer(InvocationOnMock invocation) { + try { + beforeUpdatingShadowCellsLatch.await(); + invocation.callRealMethod(); + afterUpdatingShadowCellsLatch.countDown(); + } catch (Throwable throwable) { + throwable.printStackTrace(); + } + return SettableFuture.create(); + } + }).when(syncPostCommitter).updateShadowCells(any(AbstractTransaction.class)); + + doAnswer(new Answer<ListenableFuture<Void>>() { + public ListenableFuture<Void> answer(InvocationOnMock invocation) { + try { + beforeRemovingCTEntryLatch.await(); + LOG.info("We are here"); + invocation.callRealMethod(); + afterRemovingCTEntryLatch.countDown(); + } catch (Throwable throwable) { + throwable.printStackTrace(); + } + return SettableFuture.create(); + } + }).when(syncPostCommitter).removeCommitTableEntry(any(AbstractTransaction.class)); + + try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) { + + // Execute tx with async post commit actions + Transaction tx1 = tm.begin(); + + Put put1 = new Put(row1); + put1.add(family, qualifier, Bytes.toBytes("hey!")); + txTable.put(tx1, put1); + Put put2 = new Put(row2); + put2.add(family, qualifier, Bytes.toBytes("hou!")); + txTable.put(tx1, put2); + + tm.commit(tx1); + + long tx1Id = tx1.getTransactionId(); + + // As we have paused the update of shadow cells, the shadow cells shouldn't be there yet + assertFalse(CellUtils.hasShadowCell(row1, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable))); + assertFalse(CellUtils.hasShadowCell(row2, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable))); + + // Commit Table should contain an entry for the transaction + Optional<CommitTable.CommitTimestamp> commitTimestamp = commitTableClient.getCommitTimestamp(tx1Id).get(); + assertTrue(commitTimestamp.isPresent()); + assertTrue(commitTimestamp.get().isValid()); + assertEquals(commitTimestamp.get().getValue(), ((AbstractTransaction) tx1).getCommitTimestamp()); + + // Read from row1 and row2 in a different Tx and check that result is the data written by tx1 despite the + // post commit actions have not been executed yet (the shadow cells healing process should make its work) + Transaction tx2 = tm.begin(); + Get get1 = new Get(row1); + Result result = txTable.get(tx2, get1); + byte[] value = result.getValue(family, qualifier); + assertNotNull(value); + assertEquals("hey!", Bytes.toString(value)); + + Get get2 = new Get(row2); + result = txTable.get(tx2, get2); + value = result.getValue(family, qualifier); + assertNotNull(value); + assertEquals("hou!", Bytes.toString(value)); + + // Then, we continue with the update of shadow cells and we wait till completed + beforeUpdatingShadowCellsLatch.countDown(); + afterUpdatingShadowCellsLatch.await(); + + // Now we can check that the shadow cells are there... + verify(syncPostCommitter, times(1)).updateShadowCells(any(AbstractTransaction.class)); + assertTrue(CellUtils.hasShadowCell(row1, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable))); + assertTrue(CellUtils.hasShadowCell(row2, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable))); + // ...and the transaction entry is still in the Commit Table + commitTimestamp = commitTableClient.getCommitTimestamp(tx1Id).get(); + assertTrue(commitTimestamp.isPresent()); + assertTrue(commitTimestamp.get().isValid()); + assertEquals(commitTimestamp.get().getValue(), ((AbstractTransaction) tx1).getCommitTimestamp()); + + // Finally, we continue till the Commit Table cleaning process is done... + beforeRemovingCTEntryLatch.countDown(); + afterRemovingCTEntryLatch.await(); + + // ...so now, the Commit Table should NOT contain the entry for the transaction anymore + verify(syncPostCommitter, times(1)).removeCommitTableEntry(any(AbstractTransaction.class)); + commitTimestamp = commitTableClient.getCommitTimestamp(tx1Id).get(); + assertFalse(commitTimestamp.isPresent()); + + // Final checks + verify(syncPostCommitter, times(1)).updateShadowCells(any(AbstractTransaction.class)); + verify(syncPostCommitter, times(1)).removeCommitTableEntry(any(AbstractTransaction.class)); + + } + + } + + @Test(timeOut = 30_000) + public void testNoAsyncPostActionsAreCalled(ITestContext context) throws Exception { + + CommitTable.Client commitTableClient = getCommitTable(context).getClient(); + + PostCommitActions syncPostCommitter = + spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient)); + ListeningExecutorService postCommitExecutor = + MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat("postCommit-%d").build())); + PostCommitActions asyncPostCommitter = new HBaseAsyncPostCommitter(syncPostCommitter, postCommitExecutor); + + TransactionManager tm = newTransactionManager(context, asyncPostCommitter); + + final CountDownLatch updateShadowCellsCalledLatch = new CountDownLatch(1); + final CountDownLatch removeCommitTableEntryCalledLatch = new CountDownLatch(1); + + // Simulate shadow cells are not updated and commit table is not clean + doAnswer(new Answer<Void>() { + public Void answer(InvocationOnMock invocation) { + // Do not invoke real method simulating a fail of the shadow cells update + updateShadowCellsCalledLatch.countDown(); + return null; + } + }).when(syncPostCommitter).updateShadowCells(any(AbstractTransaction.class)); + + doAnswer(new Answer<Void>() { + public Void answer(InvocationOnMock invocation) { + // Do not invoke real method simulating a fail of the async clean of commit table entry + removeCommitTableEntryCalledLatch.countDown(); + return null; + } + }).when(syncPostCommitter).removeCommitTableEntry(any(AbstractTransaction.class)); + + + try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) { + + // Execute tx with async post commit actions + Transaction tx1 = tm.begin(); + + Put put1 = new Put(row1); + put1.add(family, qualifier, Bytes.toBytes("hey!")); + txTable.put(tx1, put1); + Put put2 = new Put(row2); + put2.add(family, qualifier, Bytes.toBytes("hou!")); + txTable.put(tx1, put2); + + tm.commit(tx1); + + long tx1Id = tx1.getTransactionId(); + + // The shadow cells shouldn't be there... + assertFalse(CellUtils.hasShadowCell(row1, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable))); + assertFalse(CellUtils.hasShadowCell(row2, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable))); + // ... and the should NOT have been cleaned + Optional<CommitTable.CommitTimestamp> commitTimestamp = commitTableClient.getCommitTimestamp(tx1Id).get(); + assertTrue(commitTimestamp.isPresent()); + assertTrue(commitTimestamp.get().isValid()); + + updateShadowCellsCalledLatch.await(); + + // Not even after waiting for the method call on the shadow cells update... + assertFalse(CellUtils.hasShadowCell(row1, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable))); + assertFalse(CellUtils.hasShadowCell(row2, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable))); + + removeCommitTableEntryCalledLatch.await(); + // ... and after waiting for the method call for cleaning the commit table entry + commitTimestamp = commitTableClient.getCommitTimestamp(tx1Id).get(); + assertTrue(commitTimestamp.isPresent()); + assertTrue(commitTimestamp.get().isValid()); + + // Final checks + verify(syncPostCommitter, times(1)).updateShadowCells(any(AbstractTransaction.class)); + verify(syncPostCommitter, times(1)).removeCommitTableEntry(any(AbstractTransaction.class)); + + } + + } + + @Test(timeOut = 30_000) + public void testOnlyShadowCellsUpdateIsExecuted(ITestContext context) throws Exception { + + CommitTable.Client commitTableClient = getCommitTable(context).getClient(); + + PostCommitActions syncPostCommitter = + spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient)); + ListeningExecutorService postCommitExecutor = + MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat("postCommit-%d").build())); + PostCommitActions asyncPostCommitter = new HBaseAsyncPostCommitter(syncPostCommitter, postCommitExecutor); + + TransactionManager tm = newTransactionManager(context, asyncPostCommitter); + + final CountDownLatch removeCommitTableEntryCalledLatch = new CountDownLatch(1); + + doAnswer(new Answer<Void>() { + public Void answer(InvocationOnMock invocation) { + // Do not invoke real method simulating a fail of the async clean of commit table entry + removeCommitTableEntryCalledLatch.countDown(); + return null; + } + }).when(syncPostCommitter).removeCommitTableEntry(any(AbstractTransaction.class)); + + + try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) { + + // Execute tx with async post commit actions + Transaction tx1 = tm.begin(); + + Put put1 = new Put(row1); + put1.add(family, qualifier, Bytes.toBytes("hey!")); + txTable.put(tx1, put1); + Put put2 = new Put(row2); + put2.add(family, qualifier, Bytes.toBytes("hou!")); + txTable.put(tx1, put2); + + tm.commit(tx1); + + long tx1Id = tx1.getTransactionId(); + + // We continue when the unsuccessful call of the method for cleaning commit table has been invoked + removeCommitTableEntryCalledLatch.await(); + + // We check that the shadow cells are there (because the update of the shadow cells should precede + // the cleaning of the commit table entry) ... + assertTrue(CellUtils.hasShadowCell(row1, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable))); + assertTrue(CellUtils.hasShadowCell(row2, family, qualifier, tx1Id, new TTableCellGetterAdapter(txTable))); + + // ... and the commit table entry has NOT been cleaned + Optional<CommitTable.CommitTimestamp> commitTimestamp = commitTableClient.getCommitTimestamp(tx1Id).get(); + assertTrue(commitTimestamp.isPresent()); + assertTrue(commitTimestamp.get().isValid()); + + // Final checks + verify(syncPostCommitter, times(1)).updateShadowCells(any(AbstractTransaction.class)); + verify(syncPostCommitter, times(1)).removeCommitTableEntry(any(AbstractTransaction.class)); + + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/java/org/apache/omid/transaction/TestAutoFlush.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestAutoFlush.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestAutoFlush.java new file mode 100644 index 0000000..297dd01 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestAutoFlush.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.transaction; + +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.testng.ITestContext; +import org.testng.annotations.Test; + +import static org.testng.AssertJUnit.assertEquals; + +@Test(groups = "sharedHBase") +public class TestAutoFlush extends OmidTestBase { + + @Test + public void testReadWithSeveralUncommitted(ITestContext context) throws Exception { + byte[] family = Bytes.toBytes(TEST_FAMILY); + byte[] row = Bytes.toBytes("row"); + byte[] col = Bytes.toBytes("col1"); + byte[] data = Bytes.toBytes("data"); + TransactionManager tm = newTransactionManager(context); + TTable table = new TTable(hbaseConf, TEST_TABLE); + + // Turn off autoflush + table.setAutoFlush(false); + + Transaction t = tm.begin(); + Put put = new Put(row); + put.add(family, col, data); + table.put(t, put); + + // Data shouldn't be in DB yet + Get get = new Get(row); + Result result = table.getHTable().get(get); + assertEquals("Writes are already in DB", 0, result.size()); + + tm.commit(t); + + // After commit, both the cell and shadow cell should be there. + // That's why we check for two elements in the test assertion + result = table.getHTable().get(get); + assertEquals("Writes were not flushed to DB", 2, result.size()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/java/org/apache/omid/transaction/TestBasicTransaction.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestBasicTransaction.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestBasicTransaction.java new file mode 100644 index 0000000..504710d --- /dev/null +++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestBasicTransaction.java @@ -0,0 +1,440 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.transaction; + +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.ITestContext; +import org.testng.annotations.Test; + +import static org.junit.Assert.fail; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +@Test(groups = "sharedHBase") +public class TestBasicTransaction extends OmidTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(TestBasicTransaction.class); + + + @Test(timeOut = 30_000) + public void testTimestampsOfTwoRowsInstertedAfterCommitOfSingleTransactionAreEquals(ITestContext context) throws Exception { + + TransactionManager tm = newTransactionManager(context); + TTable tt = new TTable(hbaseConf, TEST_TABLE); + + byte[] rowName1 = Bytes.toBytes("row1"); + byte[] rowName2 = Bytes.toBytes("row2"); + byte[] famName1 = Bytes.toBytes(TEST_FAMILY); + byte[] colName1 = Bytes.toBytes("col1"); + byte[] dataValue1 = Bytes.toBytes("testWrite-1"); + byte[] dataValue2 = Bytes.toBytes("testWrite-2"); + + Transaction tx1 = tm.begin(); + + Put row1 = new Put(rowName1); + row1.add(famName1, colName1, dataValue1); + tt.put(tx1, row1); + Put row2 = new Put(rowName2); + row2.add(famName1, colName1, dataValue2); + tt.put(tx1, row2); + + tm.commit(tx1); + + tt.close(); + + // Checks + Get getResultRow1 = new Get(rowName1).setMaxVersions(1); + Result result1 = tt.getHTable().get(getResultRow1); + byte[] val1 = result1.getValue(famName1, colName1); + assertTrue(Bytes.equals(dataValue1, result1.getValue(famName1, colName1)), + "Unexpected value for row 1 in col 1: " + Bytes.toString(val1)); + long tsRow1 = result1.rawCells()[0].getTimestamp(); + + Get getResultRow2 = new Get(rowName2).setMaxVersions(1); + Result result2 = tt.getHTable().get(getResultRow2); + byte[] val2 = result2.getValue(famName1, colName1); + assertTrue(Bytes.equals(dataValue2, result2.getValue(famName1, colName1)), + "Unexpected value for row 2 in col 1: " + Bytes.toString(val2)); + long tsRow2 = result2.rawCells()[0].getTimestamp(); + + assertEquals(tsRow2, tsRow1, "Timestamps of row 1 and row 2 are different"); + + } + + @Test(timeOut = 30_000) + public void testTimestampsOfTwoRowsModifiedByTwoSequentialTransactionsAreEqualAndHaveBeenIncreasedMonotonically(ITestContext context) + throws Exception { + + TransactionManager tm = newTransactionManager(context); + TTable tt = new TTable(hbaseConf, TEST_TABLE); + + byte[] rowName1 = Bytes.toBytes("row1"); + byte[] rowName2 = Bytes.toBytes("row2"); + byte[] famName1 = Bytes.toBytes(TEST_FAMILY); + byte[] colName1 = Bytes.toBytes("col1"); + byte[] dataValue1 = Bytes.toBytes("testWrite-1"); + byte[] dataValue2 = Bytes.toBytes("testWrite-2"); + + byte[] dataValue3 = Bytes.toBytes("testWrite-3"); + byte[] dataValue4 = Bytes.toBytes("testWrite-4"); + + Transaction tx1 = tm.begin(); + + Put row1 = new Put(rowName1); + row1.add(famName1, colName1, dataValue1); + tt.put(tx1, row1); + Put row2 = new Put(rowName2); + row2.add(famName1, colName1, dataValue2); + tt.put(tx1, row2); + + tm.commit(tx1); + + Transaction tx2 = tm.begin(); + + row1 = new Put(rowName1); + row1.add(famName1, colName1, dataValue3); + tt.put(tx2, row1); + row2 = new Put(rowName2); + row2.add(famName1, colName1, dataValue4); + tt.put(tx2, row2); + + tm.commit(tx2); + + tt.close(); + + // Checks + Get getResultRow1 = new Get(rowName1).setMaxVersions(2); + Result result1 = tt.getHTable().get(getResultRow1); + byte[] val1 = result1.getValue(famName1, colName1); + assertTrue(Bytes.equals(dataValue3, result1.getValue(famName1, colName1)), + "Unexpected value for row 1 in col 1: " + Bytes.toString(val1)); + + long lastTsRow1 = result1.rawCells()[0].getTimestamp(); + long previousTsRow1 = result1.rawCells()[1].getTimestamp(); + + Get getResultRow2 = new Get(rowName2).setMaxVersions(2); + Result result2 = tt.getHTable().get(getResultRow2); + byte[] val2 = result2.getValue(famName1, colName1); + assertTrue(Bytes.equals(dataValue4, result2.getValue(famName1, colName1)), + "Unexpected value for row 2 in col 1: " + Bytes.toString(val2)); + + long lastTsRow2 = result2.rawCells()[0].getTimestamp(); + long previousTsRow2 = result2.rawCells()[1].getTimestamp(); + + assertTrue(lastTsRow1 == lastTsRow2, "Timestamps assigned by Tx2 to row 1 and row 2 are different"); + assertTrue(previousTsRow1 == previousTsRow2, "Timestamps assigned by Tx2 to row 1 and row 2 are different"); + assertTrue(lastTsRow1 > previousTsRow1, "Timestamp assigned by Tx2 to row 1 hasn't increased monotonically"); + assertTrue(lastTsRow2 > previousTsRow2, "Timestamp assigned by Tx2 to row 2 hasn't increased monotonically"); + + } + + @Test(timeOut = 30_000) + public void runTestSimple(ITestContext context) throws Exception { + + TransactionManager tm = newTransactionManager(context); + + TTable tt = new TTable(hbaseConf, TEST_TABLE); + + Transaction t1 = tm.begin(); + LOG.info("Transaction created " + t1); + + byte[] row = Bytes.toBytes("test-simple"); + byte[] fam = Bytes.toBytes(TEST_FAMILY); + byte[] col = Bytes.toBytes("testdata"); + byte[] data1 = Bytes.toBytes("testWrite-1"); + byte[] data2 = Bytes.toBytes("testWrite-2"); + + Put p = new Put(row); + p.add(fam, col, data1); + tt.put(t1, p); + tm.commit(t1); + + Transaction tread = tm.begin(); + Transaction t2 = tm.begin(); + p = new Put(row); + p.add(fam, col, data2); + tt.put(t2, p); + tm.commit(t2); + + Get g = new Get(row).setMaxVersions(1); + Result r = tt.getHTable().get(g); + assertTrue(Bytes.equals(data2, r.getValue(fam, col)), + "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col))); + + r = tt.get(tread, g); + assertTrue(Bytes.equals(data1, r.getValue(fam, col)), + "Unexpected value for SI read " + tread + ": " + Bytes.toString(r.getValue(fam, col))); + } + + @Test(timeOut = 30_000) + public void runTestManyVersions(ITestContext context) throws Exception { + + TransactionManager tm = newTransactionManager(context); + TTable tt = new TTable(hbaseConf, TEST_TABLE); + + Transaction t1 = tm.begin(); + LOG.info("Transaction created " + t1); + + byte[] row = Bytes.toBytes("test-simple"); + byte[] fam = Bytes.toBytes(TEST_FAMILY); + byte[] col = Bytes.toBytes("testdata"); + byte[] data1 = Bytes.toBytes("testWrite-1"); + byte[] data2 = Bytes.toBytes("testWrite-2"); + + Put p = new Put(row); + p.add(fam, col, data1); + tt.put(t1, p); + tm.commit(t1); + + for (int i = 0; i < 5; ++i) { + Transaction t2 = tm.begin(); + p = new Put(row); + p.add(fam, col, data2); + tt.put(t2, p); + } + Transaction tread = tm.begin(); + + Get g = new Get(row).setMaxVersions(1); + Result r = tt.getHTable().get(g); + assertTrue(Bytes.equals(data2, r.getValue(fam, col)), + "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col))); + + r = tt.get(tread, g); + assertTrue(Bytes.equals(data1, r.getValue(fam, col)), + "Unexpected value for SI read " + tread + ": " + Bytes.toString(r.getValue(fam, col))); + + } + + @Test(timeOut = 30_000) + public void runTestInterleave(ITestContext context) throws Exception { + + TransactionManager tm = newTransactionManager(context); + TTable tt = new TTable(hbaseConf, TEST_TABLE); + + Transaction t1 = tm.begin(); + LOG.info("Transaction created " + t1); + + byte[] row = Bytes.toBytes("test-interleave"); + byte[] fam = Bytes.toBytes(TEST_FAMILY); + byte[] col = Bytes.toBytes("testdata"); + byte[] data1 = Bytes.toBytes("testWrite-1"); + byte[] data2 = Bytes.toBytes("testWrite-2"); + + Put p = new Put(row); + p.add(fam, col, data1); + tt.put(t1, p); + tm.commit(t1); + + Transaction t2 = tm.begin(); + p = new Put(row); + p.add(fam, col, data2); + tt.put(t2, p); + + Transaction tread = tm.begin(); + Get g = new Get(row).setMaxVersions(1); + Result r = tt.get(tread, g); + assertTrue(Bytes.equals(data1, r.getValue(fam, col)), + "Unexpected value for SI read " + tread + ": " + Bytes.toString(r.getValue(fam, col))); + tm.commit(t2); + + r = tt.getHTable().get(g); + assertTrue(Bytes.equals(data2, r.getValue(fam, col)), + "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col))); + + } + + @Test(expectedExceptions = IllegalArgumentException.class, timeOut = 30_000) + public void testSameCommitRaisesException(ITestContext context) throws Exception { + TransactionManager tm = newTransactionManager(context); + + Transaction t1 = tm.begin(); + tm.commit(t1); + tm.commit(t1); + } + + @Test(timeOut = 30_000) + public void testInterleavedScanReturnsTheRightSnapshotResults(ITestContext context) throws Exception { + + TransactionManager tm = newTransactionManager(context); + TTable txTable = new TTable(hbaseConf, TEST_TABLE); + + // Basic data-scaffolding for test + byte[] fam = Bytes.toBytes(TEST_FAMILY); + byte[] col = Bytes.toBytes("TEST_COL"); + byte[] data1 = Bytes.toBytes("testWrite-1"); + byte[] data2 = Bytes.toBytes("testWrite-2"); + + byte[] startRow = Bytes.toBytes("row-to-scan" + 0); + byte[] stopRow = Bytes.toBytes("row-to-scan" + 9); + byte[] randomRow = Bytes.toBytes("row-to-scan" + 3); + + // Add some data transactionally to have an initial state for the test + Transaction tx1 = tm.begin(); + for (int i = 0; i < 10; i++) { + byte[] row = Bytes.toBytes("row-to-scan" + i); + + Put p = new Put(row); + p.add(fam, col, data1); + txTable.put(tx1, p); + } + tm.commit(tx1); + + // Start a second transaction -Tx2- modifying a random row and check that a concurrent transactional context + // that scans the table, gets the proper snapshot with the stuff written by Tx1 + Transaction tx2 = tm.begin(); + Put p = new Put(randomRow); + p.add(fam, col, data2); + txTable.put(tx2, p); + + Transaction scanTx = tm.begin(); // This is the concurrent transactional scanner + ResultScanner rs = txTable.getScanner(scanTx, new Scan().setStartRow(startRow).setStopRow(stopRow)); + Result r = rs.next(); // Exercise the next() method + int i = 0; + while (r != null) { + LOG.trace("Scan (" + ++i + ")" + Bytes.toString(r.getRow()) + " => " + Bytes.toString(r.getValue(fam, col))); + assertTrue(Bytes.equals(data1, r.getValue(fam, col)), + "Unexpected value for SI scan " + scanTx + ": " + Bytes.toString(r.getValue(fam, col))); + r = rs.next(); + } + + // Commit the Tx2 and then check that under a new transactional context, the scanner gets the right snapshot, + // which must include the row modified by Tx2 + tm.commit(tx2); + + int modifiedRows = 0; + Transaction newScanTx = tm.begin(); + ResultScanner newRS = txTable.getScanner(newScanTx, new Scan().setStartRow(startRow).setStopRow(stopRow)); + Result[] results = newRS.next(10); // Exercise the next(numRows) method + for (Result result : results) { + if (Bytes.equals(data2, result.getValue(fam, col))) { + LOG.trace("Modified :" + Bytes.toString(result.getRow())); + modifiedRows++; + } + } + assertEquals(modifiedRows, 1, "Expected 1 row modified, but " + modifiedRows + " are."); + + // Same check as before but checking that the results are correct when retrieved through the Scanner Iterator + modifiedRows = 0; + ResultScanner iterableRS = txTable.getScanner(newScanTx, new Scan().setStartRow(startRow).setStopRow(stopRow)); + for (Result res : iterableRS) { + if (Bytes.equals(data2, res.getValue(fam, col))) { + LOG.trace("Modified :" + Bytes.toString(res.getRow())); + modifiedRows++; + } + } + + assertEquals(modifiedRows, 1, "Expected 1 row modified, but " + modifiedRows + " are."); + + // Finally, check that the Scanner Iterator does not implement the remove method + try { + iterableRS.iterator().remove(); + fail(); + } catch (RuntimeException re) { + // Expected + } + + } + + @Test(timeOut = 30_000) + public void testInterleavedScanReturnsTheRightSnapshotResultsWhenATransactionAborts(ITestContext context) + throws Exception { + + TransactionManager tm = newTransactionManager(context); + TTable txTable = new TTable(hbaseConf, TEST_TABLE); + + // Basic data-scaffolding for test + byte[] fam = Bytes.toBytes(TEST_FAMILY); + byte[] col = Bytes.toBytes("TEST_COL"); + byte[] data1 = Bytes.toBytes("testWrite-1"); + byte[] data2 = Bytes.toBytes("testWrite-2"); + + byte[] startRow = Bytes.toBytes("row-to-scan" + 0); + byte[] stopRow = Bytes.toBytes("row-to-scan" + 9); + byte[] randomRow = Bytes.toBytes("row-to-scan" + 3); + + // Add some data transactionally to have an initial state for the test + Transaction tx1 = tm.begin(); + for (int i = 0; i < 10; i++) { + byte[] row = Bytes.toBytes("row-to-scan" + i); + + Put p = new Put(row); + p.add(fam, col, data1); + txTable.put(tx1, p); + } + tm.commit(tx1); + + // Start a second transaction modifying a random row and check that a transactional scanner in Tx2 gets the + // right snapshot with the new value in the random row just written by Tx2 + Transaction tx2 = tm.begin(); + Put p = new Put(randomRow); + p.add(fam, col, data2); + txTable.put(tx2, p); + + int modifiedRows = 0; + ResultScanner rs = txTable.getScanner(tx2, new Scan().setStartRow(startRow).setStopRow(stopRow)); + Result r = rs.next(); + while (r != null) { + if (Bytes.equals(data2, r.getValue(fam, col))) { + LOG.trace("Modified :" + Bytes.toString(r.getRow())); + modifiedRows++; + } + + r = rs.next(); + } + + assertEquals(modifiedRows, 1, "Expected 1 row modified, but " + modifiedRows + " are."); + + // Rollback the second transaction and then check that under a new transactional scanner we get the snapshot + // that includes the only the initial rows put by Tx1 + tm.rollback(tx2); + + Transaction txScan = tm.begin(); + rs = txTable.getScanner(txScan, new Scan().setStartRow(startRow).setStopRow(stopRow)); + r = rs.next(); + while (r != null) { + LOG.trace("Scan1 :" + Bytes.toString(r.getRow()) + " => " + Bytes.toString(r.getValue(fam, col))); + assertTrue(Bytes.equals(data1, r.getValue(fam, col)), + "Unexpected value for SI scan " + txScan + ": " + Bytes.toString(r.getValue(fam, col))); + r = rs.next(); + } + + // Same check as before but checking that the results are correct when retrieved through the Scanner Iterator + ResultScanner iterableRS = txTable.getScanner(txScan, new Scan().setStartRow(startRow).setStopRow(stopRow)); + for (Result result : iterableRS) { + assertTrue(Bytes.equals(data1, result.getValue(fam, col)), + "Unexpected value for SI scan " + txScan + ": " + Bytes.toString(result.getValue(fam, col))); + } + + // Finally, check that the Scanner Iterator does not implement the remove method + try { + iterableRS.iterator().remove(); + fail(); + } catch (RuntimeException re) { + // Expected + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/java/org/apache/omid/transaction/TestCellUtils.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestCellUtils.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestCellUtils.java new file mode 100644 index 0000000..7801d0c --- /dev/null +++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestCellUtils.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.transaction; + +import com.google.common.base.Optional; +import org.apache.omid.HBaseShims; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.util.Bytes; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.SortedMap; + +import static org.apache.omid.transaction.CellUtils.SHADOW_CELL_SUFFIX; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertFalse; +import static org.testng.AssertJUnit.assertTrue; +import static org.testng.AssertJUnit.fail; + +@Test(groups = "noHBase") +public class TestCellUtils { + + private final byte[] row = Bytes.toBytes("test-row"); + private final byte[] family = Bytes.toBytes("test-family"); + private final byte[] qualifier = Bytes.toBytes("test-qual"); + private final byte[] otherQualifier = Bytes.toBytes("other-test-qual"); + + @DataProvider(name = "shadow-cell-suffixes") + public Object[][] createShadowCellSuffixes() { + return new Object[][]{ + {SHADOW_CELL_SUFFIX}, + }; + } + + @Test(dataProvider = "shadow-cell-suffixes") + public void testShadowCellQualifiers(byte[] shadowCellSuffixToTest) throws IOException { + + final byte[] validShadowCellQualifier = + com.google.common.primitives.Bytes.concat(qualifier, shadowCellSuffixToTest); + final byte[] sandwichValidShadowCellQualifier = + com.google.common.primitives.Bytes.concat(shadowCellSuffixToTest, validShadowCellQualifier); + final byte[] doubleEndedValidShadowCellQualifier = + com.google.common.primitives.Bytes.concat(validShadowCellQualifier, shadowCellSuffixToTest); + final byte[] interleavedValidShadowCellQualifier = + com.google.common.primitives.Bytes.concat(validShadowCellQualifier, + com.google.common.primitives.Bytes + .concat(validShadowCellQualifier, validShadowCellQualifier)); + final byte[] value = Bytes.toBytes("test-value"); + + // Test the qualifier passed is a shadow cell + // qualifier because it contains only one suffix + // and is placed at the end of the qualifier: + // qual_nameSUFFIX + KeyValue kv = new KeyValue(row, family, validShadowCellQualifier, value); + assertTrue("Should include a valid shadowCell identifier", CellUtils.isShadowCell(kv)); + + // We also accept this pattern in the qualifier: + // SUFFIXqual_nameSUFFIX + kv = new KeyValue(row, family, sandwichValidShadowCellQualifier, value); + assertTrue("Should include a valid shadowCell identifier", CellUtils.isShadowCell(kv)); + + // We also accept this pattern in the qualifier: + // qual_nameSUFFIXSUFFIX + kv = new KeyValue(row, family, doubleEndedValidShadowCellQualifier, value); + assertTrue("Should include a valid shadowCell identifier", CellUtils.isShadowCell(kv)); + + // We also accept this pattern in the qualifier: + // qual_nameSUFFIXqual_nameSUFFIXqual_nameSUFFIX + kv = new KeyValue(row, family, interleavedValidShadowCellQualifier, value); + assertTrue("Should include a valid shadowCell identifier", CellUtils.isShadowCell(kv)); + + // Test the qualifier passed is not a shadow cell + // qualifier if there's nothing else apart from the suffix + kv = new KeyValue(row, family, shadowCellSuffixToTest, value); + assertFalse("Should not include a valid shadowCell identifier", CellUtils.isShadowCell(kv)); + + } + + @Test + public void testCorrectMapingOfCellsToShadowCells() throws IOException { + // Create the required data + final byte[] validShadowCellQualifier = + com.google.common.primitives.Bytes.concat(qualifier, SHADOW_CELL_SUFFIX); + + final byte[] qualifier2 = Bytes.toBytes("test-qual2"); + final byte[] validShadowCellQualifier2 = + com.google.common.primitives.Bytes.concat(qualifier2, SHADOW_CELL_SUFFIX); + + final byte[] qualifier3 = Bytes.toBytes("test-qual3"); + + Cell cell1 = new KeyValue(row, family, qualifier, 1, Bytes.toBytes("value")); // Default type is Put + Cell dupCell1 = new KeyValue(row, family, qualifier, 1, Bytes.toBytes("value")); // Default type is Put + Cell dupCell1WithAnotherValue = new KeyValue(row, family, qualifier, 1, Bytes.toBytes("other-value")); + Cell delCell1 = new KeyValue(row, family, qualifier, 1, Type.Delete, Bytes.toBytes("value")); + Cell shadowCell1 = new KeyValue(row, family, validShadowCellQualifier, 1, Bytes.toBytes("sc-value")); + + Cell cell2 = new KeyValue(row, family, qualifier2, 1, Bytes.toBytes("value2")); + Cell shadowCell2 = new KeyValue(row, family, validShadowCellQualifier2, 1, Bytes.toBytes("sc-value2")); + + Cell cell3 = new KeyValue(row, family, qualifier3, 1, Bytes.toBytes("value3")); + + // Check a list of cells with duplicate values + List<Cell> badListWithDups = new ArrayList<>(); + badListWithDups.add(cell1); + badListWithDups.add(dupCell1WithAnotherValue); + + // Check dup shadow cell with same MVCC is ignored + SortedMap<Cell, Optional<Cell>> cellsToShadowCells = CellUtils.mapCellsToShadowCells(badListWithDups); + assertEquals("There should be only 1 key-value maps", 1, cellsToShadowCells.size()); + assertTrue(cellsToShadowCells.containsKey(cell1)); + KeyValue firstKey = (KeyValue) cellsToShadowCells.firstKey(); + KeyValue lastKey = (KeyValue) cellsToShadowCells.lastKey(); + assertTrue(firstKey.equals(lastKey)); + assertTrue("Should be equal", 0 == Bytes.compareTo( + firstKey.getValueArray(), firstKey.getValueOffset(), firstKey.getValueLength(), + cell1.getValueArray(), cell1.getValueOffset(), cell1.getValueLength())); + + // Modify dup shadow cell to have a greater MVCC and check that is replaced + HBaseShims.setKeyValueSequenceId((KeyValue) dupCell1WithAnotherValue, 1); + cellsToShadowCells = CellUtils.mapCellsToShadowCells(badListWithDups); + assertEquals("There should be only 1 key-value maps", 1, cellsToShadowCells.size()); + assertTrue(cellsToShadowCells.containsKey(dupCell1WithAnotherValue)); + firstKey = (KeyValue) cellsToShadowCells.firstKey(); + lastKey = (KeyValue) cellsToShadowCells.lastKey(); + assertTrue(firstKey.equals(lastKey)); + assertTrue("Should be equal", 0 == Bytes.compareTo( + firstKey.getValueArray(), firstKey.getValueOffset(), firstKey.getValueLength(), + dupCell1WithAnotherValue.getValueArray(), dupCell1WithAnotherValue.getValueOffset(), + dupCell1WithAnotherValue.getValueLength())); + // Check a list of cells with duplicate values + List<Cell> cellListWithDups = new ArrayList<>(); + cellListWithDups.add(cell1); + cellListWithDups.add(shadowCell1); + cellListWithDups.add(dupCell1); // Dup cell + cellListWithDups.add(delCell1); // Another Dup cell but with different type + cellListWithDups.add(cell2); + cellListWithDups.add(cell3); + cellListWithDups.add(shadowCell2); + + cellsToShadowCells = CellUtils.mapCellsToShadowCells(cellListWithDups); + assertEquals("There should be only 3 key-value maps", 3, cellsToShadowCells.size()); + assertTrue(cellsToShadowCells.get(cell1).get().equals(shadowCell1)); + assertTrue(cellsToShadowCells.get(dupCell1).get().equals(shadowCell1)); + assertFalse(cellsToShadowCells.containsKey(delCell1)); // TODO This is strange and needs to be solved. + // The current algo avoids to put the delete cell + // as key after the put cell with same value was added + assertTrue(cellsToShadowCells.get(cell2).get().equals(shadowCell2)); + assertTrue(cellsToShadowCells.get(cell3).equals(Optional.absent())); + + } + + @Test + public void testShadowCellSuffixConcatenationToQualifier() { + + Cell cell = new KeyValue(row, family, qualifier, 1, Bytes.toBytes("value")); + byte[] suffixedQualifier = CellUtils.addShadowCellSuffix(cell.getQualifierArray(), + cell.getQualifierOffset(), + cell.getQualifierLength()); + byte[] expectedQualifier = com.google.common.primitives.Bytes.concat(qualifier, SHADOW_CELL_SUFFIX); + assertEquals(expectedQualifier, suffixedQualifier); + + } + + @Test(dataProvider = "shadow-cell-suffixes") + public void testShadowCellSuffixRemovalFromQualifier(byte[] shadowCellSuffixToTest) throws IOException { + + // Test removal from a correclty suffixed qualifier + byte[] suffixedQualifier = com.google.common.primitives.Bytes.concat(qualifier, shadowCellSuffixToTest); + Cell cell = new KeyValue(row, family, suffixedQualifier, 1, Bytes.toBytes("value")); + byte[] resultedQualifier = CellUtils.removeShadowCellSuffix(cell.getQualifierArray(), + cell.getQualifierOffset(), + cell.getQualifierLength()); + byte[] expectedQualifier = qualifier; + assertEquals(expectedQualifier, resultedQualifier); + + // Test removal from a badly suffixed qualifier + byte[] badlySuffixedQualifier = com.google.common.primitives.Bytes.concat(qualifier, Bytes.toBytes("BAD")); + Cell badCell = new KeyValue(row, family, badlySuffixedQualifier, 1, Bytes.toBytes("value")); + try { + CellUtils.removeShadowCellSuffix(badCell.getQualifierArray(), + badCell.getQualifierOffset(), + badCell.getQualifierLength()); + fail(); + } catch (IllegalArgumentException e) { + // Expected + } + } + + @Test + public void testMatchingQualifiers() { + Cell cell = new KeyValue(row, family, qualifier, 1, Bytes.toBytes("value")); + assertTrue(CellUtils.matchingQualifier(cell, qualifier, 0, qualifier.length)); + assertFalse(CellUtils.matchingQualifier(cell, otherQualifier, 0, otherQualifier.length)); + } + + @Test(dataProvider = "shadow-cell-suffixes") + public void testQualifierLengthFromShadowCellQualifier(byte[] shadowCellSuffixToTest) { + // Test suffixed qualifier + byte[] suffixedQualifier = com.google.common.primitives.Bytes.concat(qualifier, shadowCellSuffixToTest); + int originalQualifierLength = + CellUtils.qualifierLengthFromShadowCellQualifier(suffixedQualifier, 0, suffixedQualifier.length); + assertEquals(qualifier.length, originalQualifierLength); + + // Test passing qualifier without shadow cell suffix + originalQualifierLength = + CellUtils.qualifierLengthFromShadowCellQualifier(qualifier, 0, qualifier.length); + assertEquals(qualifier.length, originalQualifierLength); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/java/org/apache/omid/transaction/TestColumnIterator.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestColumnIterator.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestColumnIterator.java new file mode 100644 index 0000000..02287f4 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestColumnIterator.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.transaction; + +import com.google.common.collect.ImmutableList; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; +import org.mortbay.log.Log; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.testng.AssertJUnit.assertEquals; + +@Test(groups = "noHBase") +public class TestColumnIterator { + + final byte[] row = Bytes.toBytes("row"); + private final byte[] family1 = Bytes.toBytes("f1"); + private final byte[] family2 = Bytes.toBytes("f2"); + private final byte[] qualifier1 = Bytes.toBytes("c1"); + private final byte[] qualifier2 = Bytes.toBytes("c2"); + final byte[] data = Bytes.toBytes("data"); + + private final List<Cell> cells = new ArrayList<Cell>( + Arrays.asList( + // Group 1 (3 elems but grouping should filter shadow cell, so check for 2) + new KeyValue(row, family1, qualifier1, 0, data), + new KeyValue(row, family1, qualifier1, 1, data), + new KeyValue(row, family1, CellUtils.addShadowCellSuffix(qualifier1), 0, data), + // Group 2 (2 elems but grouping should filter shadow cell, so check for 1) + new KeyValue(row, family1, qualifier2, 0, data), + new KeyValue(row, family1, CellUtils.addShadowCellSuffix(qualifier2), 0, data), + // Group 3 (2 elems but grouping should filter shadow cell, so check for 1) + new KeyValue(row, family2, qualifier1, 0, data), + new KeyValue(row, family2, CellUtils.addShadowCellSuffix(qualifier1), 0, data) + ) + ); + + @Test + public void testGroupingCellsByColumnFilteringShadowCells() { + + ImmutableList<Collection<Cell>> groupedColumnsWithoutShadowCells = + TTable.groupCellsByColumnFilteringShadowCells(cells); + Log.info("Column Groups " + groupedColumnsWithoutShadowCells); + assertEquals("Should be 3 column groups", 3, groupedColumnsWithoutShadowCells.size()); + int group1Counter = 0; + int group2Counter = 0; + int group3Counter = 0; + for (Collection<Cell> columns : groupedColumnsWithoutShadowCells) { + for (Cell cell : columns) { + byte[] cellFamily = CellUtil.cloneFamily(cell); + byte[] cellQualifier = CellUtil.cloneQualifier(cell); + // Group 1 + if (Bytes.equals(cellFamily, family1) && + Bytes.equals(cellQualifier, qualifier1)) { + group1Counter++; + } + // Group 2 + if (Bytes.equals(cellFamily, family1) && + Bytes.equals(cellQualifier, qualifier2)) { + group2Counter++; + } + // Group 3 + if (Bytes.equals(cellFamily, family2) && + Bytes.equals(cellQualifier, qualifier1)) { + group3Counter++; + } + } + } + + assertEquals("Group 1 should have 2 elems", 2, group1Counter); + assertEquals("Group 2 should have 1 elems", 1, group2Counter); + assertEquals("Group 3 should have 1 elems", 1, group3Counter); + } +} http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/java/org/apache/omid/transaction/TestDeletion.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestDeletion.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestDeletion.java new file mode 100644 index 0000000..201cbfb --- /dev/null +++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestDeletion.java @@ -0,0 +1,299 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.transaction; + +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.AssertJUnit; +import org.testng.ITestContext; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.testng.Assert.assertTrue; + +@Test(groups = "sharedHBase") +public class TestDeletion extends OmidTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(TestDeletion.class); + + private byte[] famA = Bytes.toBytes(TEST_FAMILY); + private byte[] famB = Bytes.toBytes(TEST_FAMILY2); + private byte[] colA = Bytes.toBytes("testdataA"); + private byte[] colB = Bytes.toBytes("testdataB"); + private byte[] data1 = Bytes.toBytes("testWrite-1"); + private byte[] modrow = Bytes.toBytes("test-del" + 3); + + private static class FamCol { + + final byte[] fam; + final byte[] col; + + FamCol(byte[] fam, byte[] col) { + this.fam = fam; + this.col = col; + } + + } + + @Test + public void runTestDeleteFamily(ITestContext context) throws Exception { + + TransactionManager tm = newTransactionManager(context); + TTable tt = new TTable(hbaseConf, TEST_TABLE); + + Transaction t1 = tm.begin(); + LOG.info("Transaction created " + t1); + + int rowsWritten = 10; + FamCol famColA = new FamCol(famA, colA); + FamCol famColB = new FamCol(famB, colB); + writeRows(tt, t1, rowsWritten, famColA, famColB); + tm.commit(t1); + + Transaction t2 = tm.begin(); + Delete d = new Delete(modrow); + d.deleteFamily(famA); + tt.delete(t2, d); + + Transaction tscan = tm.begin(); + ResultScanner rs = tt.getScanner(tscan, new Scan()); + + Map<FamCol, Integer> count = countColsInRows(rs, famColA, famColB); + AssertJUnit.assertEquals("ColA count should be equal to rowsWritten", rowsWritten, (int) count.get(famColA)); + AssertJUnit.assertEquals("ColB count should be equal to rowsWritten", rowsWritten, (int) count.get(famColB)); + tm.commit(t2); + + tscan = tm.begin(); + rs = tt.getScanner(tscan, new Scan()); + + count = countColsInRows(rs, famColA, famColB); + AssertJUnit + .assertEquals("ColA count should be equal to rowsWritten - 1", (rowsWritten - 1), (int) count.get(famColA)); + AssertJUnit.assertEquals("ColB count should be equal to rowsWritten", rowsWritten, (int) count.get(famColB)); + } + + @Test + public void runTestDeleteColumn(ITestContext context) throws Exception { + + TransactionManager tm = newTransactionManager(context); + TTable tt = new TTable(hbaseConf, TEST_TABLE); + + Transaction t1 = tm.begin(); + LOG.info("Transaction created " + t1); + + int rowsWritten = 10; + + FamCol famColA = new FamCol(famA, colA); + FamCol famColB = new FamCol(famA, colB); + writeRows(tt, t1, rowsWritten, famColA, famColB); + tm.commit(t1); + + Transaction t2 = tm.begin(); + Delete d = new Delete(modrow); + d.deleteColumn(famA, colA); + tt.delete(t2, d); + + Transaction tscan = tm.begin(); + ResultScanner rs = tt.getScanner(tscan, new Scan()); + + Map<FamCol, Integer> count = countColsInRows(rs, famColA, famColB); + AssertJUnit.assertEquals("ColA count should be equal to rowsWritten", rowsWritten, (int) count.get(famColA)); + AssertJUnit.assertEquals("ColB count should be equal to rowsWritten", rowsWritten, (int) count.get(famColB)); + tm.commit(t2); + + tscan = tm.begin(); + rs = tt.getScanner(tscan, new Scan()); + + count = countColsInRows(rs, famColA, famColB); + AssertJUnit + .assertEquals("ColA count should be equal to rowsWritten - 1", (rowsWritten - 1), (int) count.get(famColA)); + AssertJUnit.assertEquals("ColB count should be equal to rowsWritten", rowsWritten, (int) count.get(famColB)); + } + + /** + * This test is very similar to #runTestDeleteColumn() but exercises Delete#deleteColumns() + */ + @Test + public void runTestDeleteColumns(ITestContext context) throws Exception { + + TransactionManager tm = newTransactionManager(context); + TTable tt = new TTable(hbaseConf, TEST_TABLE); + + Transaction t1 = tm.begin(); + LOG.info("Transaction created " + t1); + + int rowsWritten = 10; + + FamCol famColA = new FamCol(famA, colA); + FamCol famColB = new FamCol(famA, colB); + writeRows(tt, t1, rowsWritten, famColA, famColB); + tm.commit(t1); + + Transaction t2 = tm.begin(); + Delete d = new Delete(modrow); + d.deleteColumns(famA, colA); + tt.delete(t2, d); + + Transaction tscan = tm.begin(); + ResultScanner rs = tt.getScanner(tscan, new Scan()); + + Map<FamCol, Integer> count = countColsInRows(rs, famColA, famColB); + AssertJUnit.assertEquals("ColA count should be equal to rowsWritten", rowsWritten, (int) count.get(famColA)); + AssertJUnit.assertEquals("ColB count should be equal to rowsWritten", rowsWritten, (int) count.get(famColB)); + tm.commit(t2); + + tscan = tm.begin(); + rs = tt.getScanner(tscan, new Scan()); + + count = countColsInRows(rs, famColA, famColB); + + AssertJUnit + .assertEquals("ColA count should be equal to rowsWritten - 1", (rowsWritten - 1), (int) count.get(famColA)); + AssertJUnit.assertEquals("ColB count should be equal to rowsWritten", rowsWritten, (int) count.get(famColB)); + } + + @Test + public void runTestDeleteRow(ITestContext context) throws Exception { + TransactionManager tm = newTransactionManager(context); + TTable tt = new TTable(hbaseConf, TEST_TABLE); + + Transaction t1 = tm.begin(); + LOG.info("Transaction created " + t1); + + int rowsWritten = 10; + + FamCol famColA = new FamCol(famA, colA); + writeRows(tt, t1, rowsWritten, famColA); + + tm.commit(t1); + + Transaction t2 = tm.begin(); + Delete d = new Delete(modrow); + tt.delete(t2, d); + + Transaction tscan = tm.begin(); + ResultScanner rs = tt.getScanner(tscan, new Scan()); + + int rowsRead = countRows(rs); + AssertJUnit.assertTrue("Expected " + rowsWritten + " rows but " + rowsRead + " found", + rowsRead == rowsWritten); + + tm.commit(t2); + + tscan = tm.begin(); + rs = tt.getScanner(tscan, new Scan()); + + rowsRead = countRows(rs); + AssertJUnit.assertTrue("Expected " + (rowsWritten - 1) + " rows but " + rowsRead + " found", + rowsRead == (rowsWritten - 1)); + + } + + @Test + public void testDeletionOfNonExistingColumnFamilyDoesNotWriteToHBase(ITestContext context) throws Exception { + + // -------------------------------------------------------------------- + // Setup initial environment for the test + // -------------------------------------------------------------------- + TransactionManager tm = newTransactionManager(context); + TTable txTable = new TTable(hbaseConf, TEST_TABLE); + + Transaction tx1 = tm.begin(); + LOG.info("{} writing initial data created ", tx1); + Put p = new Put(Bytes.toBytes("row1")); + p.add(famA, colA, data1); + txTable.put(tx1, p); + tm.commit(tx1); + + // -------------------------------------------------------------------- + // Try to delete a non existing CF + // -------------------------------------------------------------------- + Transaction deleteTx = tm.begin(); + LOG.info("{} trying to delete a non-existing family created ", deleteTx); + Delete del = new Delete(Bytes.toBytes("row1")); + del.deleteFamily(famB); + // This delete should not put data on HBase + txTable.delete(deleteTx, del); + + // -------------------------------------------------------------------- + // Check data has not been written to HBase + // -------------------------------------------------------------------- + HTable table = new HTable(hbaseConf, TEST_TABLE); + Get get = new Get(Bytes.toBytes("row1")); + get.setTimeStamp(deleteTx.getTransactionId()); + Result result = table.get(get); + assertTrue(result.isEmpty()); + + } + + private int countRows(ResultScanner rs) throws IOException { + int count; + Result r = rs.next(); + count = 0; + while (r != null) { + count++; + LOG.trace("row: " + Bytes.toString(r.getRow()) + " count: " + count); + r = rs.next(); + } + return count; + } + + private void writeRows(TTable tt, Transaction t1, int rowcount, FamCol... famCols) throws IOException { + for (int i = 0; i < rowcount; i++) { + byte[] row = Bytes.toBytes("test-del" + i); + + Put p = new Put(row); + for (FamCol col : famCols) { + p.add(col.fam, col.col, data1); + } + tt.put(t1, p); + } + } + + private Map<FamCol, Integer> countColsInRows(ResultScanner rs, FamCol... famCols) throws IOException { + Map<FamCol, Integer> colCount = new HashMap<>(); + Result r = rs.next(); + while (r != null) { + for (FamCol col : famCols) { + if (r.containsColumn(col.fam, col.col)) { + Integer c = colCount.get(col); + + if (c == null) { + colCount.put(col, 1); + } else { + colCount.put(col, c + 1); + } + } + } + r = rs.next(); + } + return colCount; + } + +}