Repository: incubator-omid Updated Branches: refs/heads/master 1f83aeda9 -> 4c8d7821f
[OMID-74] Efficient column family deletion in Row level conflict analysis The idea is to use a qualifier to denote that all the columns of a specific family were deleted. Current implementation reads from HBase the entire family and then writes a tombstone to each one of its cells. The new implementation does not need to perform the read and only writes the qualifier to denote that the family was deleted. This is true only for Row level conflict detection since in Cell level we need to read the cells and add these to the write set. Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/4c8d7821 Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/4c8d7821 Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/4c8d7821 Branch: refs/heads/master Commit: 4c8d7821fbbad943f46f3dfc0ac51b0ea908f90f Parents: 1f83aed Author: Ohad Shacham <oh...@yahoo-inc.com> Authored: Sun Aug 6 13:35:06 2017 +0300 Committer: Ohad Shacham <oh...@yahoo-inc.com> Committed: Sun Aug 6 13:35:06 2017 +0300 ---------------------------------------------------------------------- .../transaction/HBaseTransactionManager.java | 5 + .../org/apache/omid/transaction/TTable.java | 174 +++++++++++++++---- .../omid/transaction/TestColumnIterator.java | 2 +- .../omid/transaction/TestShadowCells.java | 7 +- .../org/apache/omid/transaction/CellUtils.java | 1 + .../org/apache/omid/tso/client/TSOClient.java | 8 + 6 files changed, 159 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4c8d7821/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java index 484eb19..67df93e 100644 --- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java +++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java @@ -30,6 +30,7 @@ import org.apache.omid.committable.hbase.HBaseCommitTable; import org.apache.omid.committable.hbase.HBaseCommitTableConfig; import org.apache.omid.tools.hbase.HBaseLogin; import org.apache.omid.tso.client.CellId; +import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel; import org.apache.omid.tso.client.TSOClient; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; @@ -260,6 +261,10 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen } + public ConflictDetectionLevel getConflictDetectionLevel() { + return tsoClient.getConflictDetectionLevel(); + } + static class CommitTimestampLocatorImpl implements CommitTimestampLocator { private HBaseCellId hBaseCellId; http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4c8d7821/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java index 44a1a82..24f10ec 100644 --- a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java +++ b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.omid.committable.CommitTable.CommitTimestamp; import org.apache.omid.transaction.AbstractTransaction.VisibilityLevel; import org.apache.omid.transaction.HBaseTransactionManager.CommitTimestampLocatorImpl; +import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +62,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.NavigableMap; import java.util.NavigableSet; +import java.util.Set; /** * Provides transactional methods for accessing and modifying a given snapshot of data identified by an opaque {@link @@ -150,6 +152,8 @@ public class TTable implements Closeable { tsget.addColumn(family, qualifier); tsget.addColumn(family, CellUtils.addShadowCellSuffix(qualifier)); } + tsget.addColumn(family, CellUtils.FAMILY_DELETE_QUALIFIER); + tsget.addColumn(family, CellUtils.addShadowCellSuffix(CellUtils.FAMILY_DELETE_QUALIFIER)); } } LOG.trace("Initial Get = {}", tsget); @@ -159,12 +163,40 @@ public class TTable implements Closeable { Result result = table.get(tsget); List<Cell> filteredKeyValues = Collections.emptyList(); if (!result.isEmpty()) { - filteredKeyValues = filterCellsForSnapshot(result.listCells(), transaction, tsget.getMaxVersions()); + filteredKeyValues = filterCellsForSnapshot(result.listCells(), transaction, tsget.getMaxVersions(), new HashMap<String, List<Cell>>()); } return Result.create(filteredKeyValues); } + private void familyQualifierBasedDeletion(HBaseTransaction tx, Put deleteP, Get deleteG) throws IOException { + Result result = this.get(tx, deleteG); + if (!result.isEmpty()) { + for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entryF : result.getMap() + .entrySet()) { + byte[] family = entryF.getKey(); + for (Entry<byte[], NavigableMap<Long, byte[]>> entryQ : entryF.getValue().entrySet()) { + byte[] qualifier = entryQ.getKey(); + tx.addWriteSetElement(new HBaseCellId(table, deleteP.getRow(), family, qualifier, + tx.getWriteTimestamp())); + } + deleteP.add(family, CellUtils.FAMILY_DELETE_QUALIFIER, tx.getWriteTimestamp(), + HConstants.EMPTY_BYTE_ARRAY); + } + } + } + + private void familyQualifierBasedDeletionWithOutRead(HBaseTransaction tx, Put deleteP, Get deleteG) { + Set<byte[]> fset = deleteG.getFamilyMap().keySet(); + + for (byte[] family : fset) { + deleteP.add(family, CellUtils.FAMILY_DELETE_QUALIFIER, tx.getWriteTimestamp(), + HConstants.EMPTY_BYTE_ARRAY); + } + tx.addWriteSetElement(new HBaseCellId(table, deleteP.getRow(), null, null, + tx.getWriteTimestamp())); + } + /** * Transactional version of {@link HTableInterface#delete(Delete delete)} * @@ -178,15 +210,16 @@ public class TTable implements Closeable { HBaseTransaction transaction = enforceHBaseTransactionAsParam(tx); - final long writeTimestamp = transaction.getWriteTimestamp(); - boolean issueGet = false; + final long writeTimestamp = transaction.getStartTimestamp(); + boolean deleteFamily = false; final Put deleteP = new Put(delete.getRow(), writeTimestamp); final Get deleteG = new Get(delete.getRow()); Map<byte[], List<Cell>> fmap = delete.getFamilyCellMap(); if (fmap.isEmpty()) { - issueGet = true; + familyQualifierBasedDeletion(transaction, deleteP, deleteG); } + for (List<Cell> cells : fmap.values()) { for (Cell cell : cells) { CellUtils.validateCell(cell, writeTimestamp); @@ -205,7 +238,7 @@ public class TTable implements Closeable { break; case DeleteFamily: deleteG.addFamily(CellUtil.cloneFamily(cell)); - issueGet = true; + deleteFamily = true; break; case Delete: if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP) { @@ -229,21 +262,11 @@ public class TTable implements Closeable { } } } - if (issueGet) { - // It's better to perform a transactional get to avoid deleting more - // than necessary - Result result = this.get(transaction, deleteG); - if (!result.isEmpty()) { - for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entryF : result.getMap() - .entrySet()) { - byte[] family = entryF.getKey(); - for (Entry<byte[], NavigableMap<Long, byte[]>> entryQ : entryF.getValue().entrySet()) { - byte[] qualifier = entryQ.getKey(); - deleteP.add(family, qualifier, CellUtils.DELETE_TOMBSTONE); - transaction.addWriteSetElement(new HBaseCellId(table, delete.getRow(), family, qualifier, - writeTimestamp)); - } - } + if (deleteFamily) { + if (enforceHBaseTransactionManagerAsParam(transaction.getTransactionManager()).getConflictDetectionLevel() == ConflictDetectionLevel.ROW) { + familyQualifierBasedDeletionWithOutRead(transaction, deleteP, deleteG); + } else { + familyQualifierBasedDeletion(transaction, deleteP, deleteG); } } @@ -320,11 +343,39 @@ public class TTable implements Closeable { for (byte[] qualifier : qualifiers) { tsscan.addColumn(family, CellUtils.addShadowCellSuffix(qualifier)); } + if (!qualifiers.isEmpty()) { + scan.addColumn(entry.getKey(), CellUtils.FAMILY_DELETE_QUALIFIER); + } } return new TransactionalClientScanner(transaction, tsscan, 1); } /** + * Check whether a cell was deleted using family deletion marker + * + * @param cell The cell to check + * @param transaction Defines the current snapshot + * @param familyDeletionCache Accumulates the family deletion markers to identify cells that deleted with a higher version + * @param commitCache Holds shadow cells information + * @return Whether the cell was deleted + */ + private boolean checkFamilyDeletionCache(Cell cell, HBaseTransaction transaction, Map<String, List<Cell>> familyDeletionCache, Map<Long, Long> commitCache) throws IOException { + List<Cell> familyDeletionCells = familyDeletionCache.get(Bytes.toString((cell.getRow()))); + if (familyDeletionCells != null) { + for(Cell familyDeletionCell : familyDeletionCells) { + String family = Bytes.toString(cell.getFamily()); + String familyDeletion = Bytes.toString(familyDeletionCell.getFamily()); + if (family.equals(familyDeletion)) { + Optional<Long> familyDeletionCommitTimestamp = getCommitTimestamp(familyDeletionCell, transaction, commitCache); + if (familyDeletionCommitTimestamp.isPresent() && familyDeletionCommitTimestamp.get() >= cell.getTimestamp()) { + return true; + } + } + } + } + return false; + } + /** * Filters the raw results returned from HBase and returns only those belonging to the current snapshot, as defined * by the transaction object. If the raw results don't contain enough information for a particular qualifier, it * will request more versions from HBase. @@ -332,10 +383,11 @@ public class TTable implements Closeable { * @param rawCells Raw cells that we are going to filter * @param transaction Defines the current snapshot * @param versionsToRequest Number of versions requested from hbase + * @param familyDeletionCache Accumulates the family deletion markers to identify cells that deleted with a higher version * @return Filtered KVs belonging to the transaction snapshot */ List<Cell> filterCellsForSnapshot(List<Cell> rawCells, HBaseTransaction transaction, - int versionsToRequest) throws IOException { + int versionsToRequest, Map<String, List<Cell>> familyDeletionCache) throws IOException { assert (rawCells != null && transaction != null && versionsToRequest >= 1); @@ -348,11 +400,22 @@ public class TTable implements Closeable { } Map<Long, Long> commitCache = buildCommitCache(rawCells); + buildFamilyDeletionCache(rawCells, familyDeletionCache); - for (Collection<Cell> columnCells : groupCellsByColumnFilteringShadowCells(rawCells)) { + for (Collection<Cell> columnCells : groupCellsByColumnFilteringShadowCellsAndFamilyDeletion(rawCells)) { boolean snapshotValueFound = false; Cell oldestCell = null; for (Cell cell : columnCells) { + snapshotValueFound = checkFamilyDeletionCache(cell, transaction, familyDeletionCache, commitCache); + + if (snapshotValueFound == true) { + if (transaction.getVisibilityLevel() == VisibilityLevel.SNAPSHOT_ALL) { + snapshotValueFound = false; + } else { + break; + } + } + if (isCellInTransaction(cell, transaction, commitCache) || isCellInSnapshot(cell, transaction, commitCache)) { if (!CellUtil.matchingValue(cell, CellUtils.DELETE_TOMBSTONE)) { @@ -382,7 +445,7 @@ public class TTable implements Closeable { for (Result pendingGetResult : pendingGetsResults) { if (!pendingGetResult.isEmpty()) { keyValuesInSnapshot.addAll( - filterCellsForSnapshot(pendingGetResult.listCells(), transaction, numberOfVersionsToFetch)); + filterCellsForSnapshot(pendingGetResult.listCells(), transaction, numberOfVersionsToFetch, familyDeletionCache)); } } } @@ -405,6 +468,38 @@ public class TTable implements Closeable { return commitCache; } + private void buildFamilyDeletionCache(List<Cell> rawCells, Map<String, List<Cell>> familyDeletionCache) { + + for (Cell cell : rawCells) { + if (CellUtil.matchingQualifier(cell, CellUtils.FAMILY_DELETE_QUALIFIER) && + CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY)) { + + String row = Bytes.toString(cell.getRow()); + List<Cell> cells = familyDeletionCache.get(row); + if (cells == null) { + cells = new ArrayList<>(); + familyDeletionCache.put(row, cells); + } + + cells.add(cell); + } + } + + } + + private Optional<Long> getCommitTimestamp(Cell kv, HBaseTransaction transaction, Map<Long, Long> commitCache) + throws IOException { + + long startTimestamp = transaction.getStartTimestamp(); + + if (kv.getTimestamp() == startTimestamp) { + return Optional.of(startTimestamp); + } + + return tryToLocateCellCommitTimestamp(transaction.getTransactionManager(), transaction.getEpoch(), kv, + commitCache); + } + private boolean isCellInTransaction(Cell kv, HBaseTransaction transaction, Map<Long, Long> commitCache) { long startTimestamp = transaction.getStartTimestamp(); @@ -423,13 +518,9 @@ public class TTable implements Closeable { private boolean isCellInSnapshot(Cell kv, HBaseTransaction transaction, Map<Long, Long> commitCache) throws IOException { - long startTimestamp = transaction.getStartTimestamp(); - - Optional<Long> commitTimestamp = - tryToLocateCellCommitTimestamp(transaction.getTransactionManager(), transaction.getEpoch(), kv, - commitCache); + Optional<Long> commitTimestamp = getCommitTimestamp(kv, transaction, commitCache); - return commitTimestamp.isPresent() && commitTimestamp.get() < startTimestamp; + return commitTimestamp.isPresent() && commitTimestamp.get() < transaction.getStartTimestamp(); } private Get createPendingGet(Cell cell, int versionCount) throws IOException { @@ -507,12 +598,14 @@ public class TTable implements Closeable { private HBaseTransaction state; private ResultScanner innerScanner; private int maxVersions; + Map<String, List<Cell>> familyDeletionCache; TransactionalClientScanner(HBaseTransaction state, Scan scan, int maxVersions) throws IOException { this.state = state; this.innerScanner = table.getScanner(scan); this.maxVersions = maxVersions; + this.familyDeletionCache = new HashMap<String, List<Cell>>(); } @@ -525,7 +618,7 @@ public class TTable implements Closeable { return null; } if (!result.isEmpty()) { - filteredResult = filterCellsForSnapshot(result.listCells(), state, maxVersions); + filteredResult = filterCellsForSnapshot(result.listCells(), state, maxVersions, familyDeletionCache); } } return Result.create(filteredResult); @@ -814,13 +907,26 @@ public class TTable implements Closeable { } } - static ImmutableList<Collection<Cell>> groupCellsByColumnFilteringShadowCells(List<Cell> rawCells) { + private HBaseTransactionManager enforceHBaseTransactionManagerAsParam(TransactionManager tm) { + if (tm instanceof HBaseTransactionManager) { + return (HBaseTransactionManager) tm; + } else { + throw new IllegalArgumentException( + String.format("The transaction manager object passed %s is not an instance of HBaseTransactionManager ", + tm.getClass().getName())); + } + } + + static ImmutableList<Collection<Cell>> groupCellsByColumnFilteringShadowCellsAndFamilyDeletion(List<Cell> rawCells) { - Predicate<Cell> shadowCellFilter = new Predicate<Cell>() { + Predicate<Cell> shadowCellAndFamilyDeletionFilter = new Predicate<Cell>() { @Override public boolean apply(Cell cell) { - return cell != null && !CellUtils.isShadowCell(cell); + boolean familyDeletionMarkerCondition = CellUtil.matchingQualifier(cell, CellUtils.FAMILY_DELETE_QUALIFIER) && + CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY); + + return cell != null && !CellUtils.isShadowCell(cell) && !familyDeletionMarkerCondition; } }; @@ -834,7 +940,7 @@ public class TTable implements Closeable { }; - return Multimaps.index(Iterables.filter(rawCells, shadowCellFilter), cellToColumnWrapper) + return Multimaps.index(Iterables.filter(rawCells, shadowCellAndFamilyDeletionFilter), cellToColumnWrapper) .asMap().values() .asList(); } http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4c8d7821/hbase-client/src/test/java/org/apache/omid/transaction/TestColumnIterator.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestColumnIterator.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestColumnIterator.java index 2eacd22..a20fb3b 100644 --- a/hbase-client/src/test/java/org/apache/omid/transaction/TestColumnIterator.java +++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestColumnIterator.java @@ -61,7 +61,7 @@ public class TestColumnIterator { public void testGroupingCellsByColumnFilteringShadowCells() { ImmutableList<Collection<Cell>> groupedColumnsWithoutShadowCells = - TTable.groupCellsByColumnFilteringShadowCells(cells); + TTable.groupCellsByColumnFilteringShadowCellsAndFamilyDeletion(cells); Log.info("Column Groups " + groupedColumnsWithoutShadowCells); assertEquals(groupedColumnsWithoutShadowCells.size(), 3, "Should be 3 column groups"); int group1Counter = 0; http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4c8d7821/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java index 75e64fd..8a35f9a 100644 --- a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java +++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java @@ -20,10 +20,9 @@ package org.apache.omid.transaction; import com.google.common.base.Charsets; import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; -import org.apache.omid.committable.CommitTable; +import org.apache.omid.committable.CommitTable; import org.apache.omid.metrics.NullMetricsProvider; - import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; @@ -45,7 +44,9 @@ import org.testng.ITestContext; import org.testng.annotations.Test; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -345,7 +346,7 @@ public class TestShadowCells extends OmidTestBase { return (List<KeyValue>) invocation.callRealMethod(); } }).when(table).filterCellsForSnapshot(Matchers.<List<Cell>>any(), - any(HBaseTransaction.class), anyInt()); + any(HBaseTransaction.class), anyInt(), Matchers.<Map<String, List<Cell>>>any()); TransactionManager tm = newTransactionManager(context); if (hasShadowCell(row, http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4c8d7821/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java b/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java index 3f2425b..70c3e67 100644 --- a/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java +++ b/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java @@ -49,6 +49,7 @@ public final class CellUtils { private static final Logger LOG = LoggerFactory.getLogger(CellUtils.class); static final byte[] SHADOW_CELL_SUFFIX = "\u0080".getBytes(Charsets.UTF_8); // Non printable char (128 ASCII) static byte[] DELETE_TOMBSTONE = Bytes.toBytes("__OMID_TOMBSTONE__"); + public static final byte[] FAMILY_DELETE_QUALIFIER = new byte[0]; /** * Utility interface to get rid of the dependency on HBase server package http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4c8d7821/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java ---------------------------------------------------------------------- diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java index 6504cf8..1c62876 100644 --- a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java +++ b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java @@ -299,6 +299,14 @@ public class TSOClient implements TSOProtocol, NodeCacheListener { return epoch; } + /** + * Used for family deletion + * @return the conflict detection level. + */ + public ConflictDetectionLevel getConflictDetectionLevel() { + return conflictDetectionLevel; + } + // ---------------------------------------------------------------------------------------------------------------- // NodeCacheListener interface // ----------------------------------------------------------------------------------------------------------------