Repository: phoenix Updated Branches: refs/heads/txn ea523e7c5 -> 4906b8b2b
Keep table point deletes as index point deletes (WIP) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4906b8b2 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4906b8b2 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4906b8b2 Branch: refs/heads/txn Commit: 4906b8b2b41b5c67fe6aa345a4311290e4da7c4b Parents: ea523e7 Author: James Taylor <jtay...@salesforce.com> Authored: Mon Apr 20 13:04:17 2015 -0700 Committer: James Taylor <jtay...@salesforce.com> Committed: Mon Apr 20 13:04:17 2015 -0700 ---------------------------------------------------------------------- .../apache/phoenix/index/IndexMaintainer.java | 36 ++++++++++++++++---- .../index/PhoenixTransactionalIndexer.java | 13 +++---- 2 files changed, 37 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/4906b8b2/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index 3500dd2..19dfaa0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -781,8 +781,10 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { return put; } - public boolean isRowDeleted(Collection<Cell> pendingUpdates) { + private enum DeleteType {SINGLE_VERSION, ALL_VERSIONS}; + private DeleteType getDeleteTypeOrNull(Collection<Cell> pendingUpdates) { int nDeleteCF = 0; + int nDeleteVersionCF = 0; for (Cell kv : pendingUpdates) { if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) { nDeleteCF++; @@ -790,12 +792,19 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { dataEmptyKeyValueCF, 0, dataEmptyKeyValueCF.length) == 0; // This is what a delete looks like on the client side for immutable indexing... if (isEmptyCF) { - return true; + return DeleteType.ALL_VERSIONS; } + } else if (kv.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) { + nDeleteVersionCF++; } } // This is what a delete looks like on the server side for mutable indexing... - return nDeleteCF == this.nDataCFs; + // Should all be one or the other for DeleteFamily versus DeleteFamilyVersion, but just in case not + return nDeleteVersionCF >= this.nDataCFs ? DeleteType.SINGLE_VERSION : nDeleteCF + nDeleteVersionCF >= this.nDataCFs ? DeleteType.ALL_VERSIONS : null; + } + + public boolean isRowDeleted(Collection<Cell> pendingUpdates) { + return getDeleteTypeOrNull(pendingUpdates) != null; } private boolean hasIndexedColumnChanged(ValueGetter oldState, Collection<Cell> pendingUpdates) throws IOException { @@ -841,8 +850,18 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<Cell> pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException { byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, regionStartKey, regionEndKey); // Delete the entire row if any of the indexed columns changed - if (oldState == null || isRowDeleted(pendingUpdates) || hasIndexedColumnChanged(oldState, pendingUpdates)) { // Deleting the entire row - Delete delete = new Delete(indexRowKey, ts); + DeleteType deleteType = null; + if (oldState == null || (deleteType=getDeleteTypeOrNull(pendingUpdates)) != null || hasIndexedColumnChanged(oldState, pendingUpdates)) { // Deleting the entire row + Delete delete; + // If table delete was single version, then index delete should be as well + if (deleteType == DeleteType.SINGLE_VERSION) { + delete = new Delete(indexRowKey); + for (ColumnReference ref : getAllColumns()) { // FIXME: Keep Set<byte[]> for index CFs? + delete.deleteFamilyVersion(ref.getFamily(), ts); + } + } else { + delete = new Delete(indexRowKey, ts); + } delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); return delete; } @@ -856,7 +875,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { delete = new Delete(indexRowKey); delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } - delete.deleteColumns(ref.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts); + // If point delete for data table, then use point delete for index as well + if (kv.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) { + delete.deleteColumn(ref.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts); + } else { + delete.deleteColumns(ref.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts); + } } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/4906b8b2/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index 6a13552..38d6fd1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; @@ -289,26 +288,28 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { } public void applyMutation() { - if (mutation instanceof Delete) { + /*if (mutation instanceof Delete) { valueMap.clear(); - } else { + } else */ { for (Cell cell : pendingUpdates) { - if (cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) { + if (cell.getTypeByte() == KeyValue.Type.Delete.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) { ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); valueMap.remove(ref); - } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) { + } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) { for (ColumnReference ref : indexedColumns) { if (ref.matchesFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) { valueMap.remove(ref); } } - } else { + } else if (cell.getTypeByte() == KeyValue.Type.Put.getCode()){ ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); if (indexedColumns.contains(ref)) { ImmutableBytesWritable ptr = new ImmutableBytesWritable(); ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); valueMap.put(ref, ptr); } + } else { + throw new IllegalStateException("Unexpected mutation type for " + cell); } } }