This is an automated email from the ASF dual-hosted git repository. skadam pushed a commit to branch 4.x-HBase-1.5 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.5 by this push: new df3cc54 PHOENIX-5455 - IndexedKeyValue creation fails after HBASE-22034 (Geoffrey Jacoby) df3cc54 is described below commit df3cc54fb5ba4836a61983de9b607ec6b5e08cca Author: s.kadam <ska...@apache.org> AuthorDate: Thu Aug 29 12:39:45 2019 -0700 PHOENIX-5455 - IndexedKeyValue creation fails after HBASE-22034 (Geoffrey Jacoby) --- .../org/apache/phoenix/hbase/index/Indexer.java | 21 +--- .../phoenix/hbase/index/wal/IndexedKeyValue.java | 128 ++++++--------------- .../phoenix/hbase/index/wal/KeyValueCodec.java | 17 ++- .../regionserver/wal/IndexedKeyValueTest.java | 51 ++++++-- .../wal/ReadWriteKeyValuesWithCodecTest.java | 4 +- 5 files changed, 93 insertions(+), 128 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index ff8b555..a3a5874 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -534,11 +534,13 @@ public class Indexer extends BaseRegionObserver { if (durability != Durability.SKIP_WAL) { // we have all the WAL durability, so we just update the WAL entry and move on for (Pair<Mutation, byte[]> entry : indexUpdates) { - edit.add(new IndexedKeyValue(entry.getSecond(), entry.getFirst())); + edit.add(IndexedKeyValue.newIndexedKeyValue(entry.getSecond(), + entry.getFirst())); + } } } } - } + } private void setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) { @@ -630,21 +632,6 @@ public class Indexer extends BaseRegionObserver { } /** - * Search the {@link WALEdit} for the first {@link IndexedKeyValue} present - * @param edit {@link WALEdit} - * @return the first {@link IndexedKeyValue} in the {@link WALEdit} or <tt>null</tt> if not - * present - */ - private IndexedKeyValue getFirstIndexedKeyValue(WALEdit edit) { - for (Cell kv : edit.getCells()) { - if (kv instanceof IndexedKeyValue) { - return (IndexedKeyValue) kv; - } - } - return null; - } - - /** * Extract the index updates from the WAL Edit * @param edit to search for index updates * @return the mutations to apply to the index tables diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java index 5ad2435..33daad4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java @@ -23,6 +23,8 @@ import java.io.DataOutput; import java.io.IOException; import java.util.Arrays; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; @@ -50,13 +52,36 @@ public class IndexedKeyValue extends KeyValue { private boolean batchFinished = false; private int hashCode; - public IndexedKeyValue() {} + public static IndexedKeyValue newIndexedKeyValue(byte[] bs, Mutation m){ + Cell indexWALCell = adaptFirstCellFromMutation(m); + return new IndexedKeyValue(indexWALCell, bs, m); + } + + private static Cell adaptFirstCellFromMutation(Mutation m) { + if (m != null && m.getFamilyCellMap() != null && + m.getFamilyCellMap().firstEntry() != null && + m.getFamilyCellMap().firstEntry().getValue() != null + && m.getFamilyCellMap().firstEntry().getValue().get(0) != null) { + //have to replace the column family with WALEdit.METAFAMILY to make sure + //that IndexedKeyValues don't get replicated. The superclass KeyValue fields + //like row, qualifier and value are placeholders to prevent NPEs + // when using the KeyValue APIs. See PHOENIX-5188 / 5455 + Cell mutationCell = m.getFamilyCellMap().firstEntry().getValue().get(0); + return CellUtil.createCell(m.getRow(), WALEdit.METAFAMILY, + mutationCell.getQualifierArray(), mutationCell.getTimestamp(), + mutationCell.getTypeByte(), mutationCell.getValueArray()); + } else { + throw new IllegalArgumentException("Tried to create an IndexedKeyValue with a " + + "Mutation with no Cells!"); + } - public IndexedKeyValue(byte[] bs, Mutation mutation) { - this.bytes = mutation.getRow(); - this.offset = 0; - this.length = mutation.getRow().length; + } + //used for deserialization + public IndexedKeyValue() {} + + private IndexedKeyValue(Cell c, byte[] bs, Mutation mutation){ + super(c); this.indexTableName = new ImmutableBytesPtr(bs); this.mutation = mutation; this.hashCode = calcHashCode(indexTableName, mutation); @@ -70,74 +95,6 @@ public class IndexedKeyValue extends KeyValue { return mutation; } - /* - * Returns a faked column family for an IndexedKeyValue instance - */ - @Override - public byte [] getFamily() { - return WALEdit.METAFAMILY; - } - - @Override - public byte[] getFamilyArray() { - return WALEdit.METAFAMILY; - } - - /** - * @return Family offset - */ - @Override - public int getFamilyOffset() { - return 0; - } - - /** - * @return Family length - */ - @Override - public byte getFamilyLength() { - return (byte) WALEdit.METAFAMILY.length; - } - - @Override - public byte[] getQualifierArray() { - return COLUMN_QUALIFIER; - } - - /** - * @return Qualifier offset - */ - @Override - public int getQualifierOffset() { - return 0; - } - - /** - * @return Qualifier length - */ - @Override - public int getQualifierLength() { - return COLUMN_QUALIFIER.length; - } - - @Override - public int getRowOffset() { - return this.offset; - } - - @Override - public short getRowLength() { - return (short) this.length; - } - - @Override - public int getKeyLength(){ - //normally the key is row key + other key fields such as timestamp, - // but those aren't defined here because a Mutation can contain multiple, - // so we just return the length of the row key - return this.length; - } - @Override public String toString() { return "IndexWrite:\n\ttable: " + indexTableName + "\n\tmutation:" + mutation; @@ -175,9 +132,9 @@ public class IndexedKeyValue extends KeyValue { } /** - * Internal write the underlying data for the entry - this does not do any special prefixing. Writing should be done - * via {@link KeyValueCodec#write(DataOutput, KeyValue)} to ensure consistent reading/writing of - * {@link IndexedKeyValue}s. + * Internal write the underlying data for the entry - this does not do any special prefixing. + * Writing should be done via {@link KeyValueCodec#write(DataOutput, KeyValue)} to ensure + * consistent reading/writing of {@link IndexedKeyValue}s. * * @param out * to write data to. Does not close or flush the passed object. @@ -190,25 +147,6 @@ public class IndexedKeyValue extends KeyValue { Bytes.writeByteArray(out, m.toByteArray()); } - /** - * This method shouldn't be used - you should use {@link KeyValueCodec#readKeyValue(DataInput)} instead. Its the - * complement to {@link #writeData(DataOutput)}. - */ - @SuppressWarnings("javadoc") - public void readFields(DataInput in) throws IOException { - this.indexTableName = new ImmutableBytesPtr(Bytes.readByteArray(in)); - byte[] mutationData = Bytes.readByteArray(in); - MutationProto mProto = MutationProto.parseFrom(mutationData); - this.mutation = org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(mProto); - this.hashCode = calcHashCode(indexTableName, mutation); - if (mutation != null){ - bytes = mutation.getRow(); - offset = 0; - length = bytes.length; - } - - } - public boolean getBatchFinished() { return this.batchFinished; } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/KeyValueCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/KeyValueCodec.java index 682a504..9678c7b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/KeyValueCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/KeyValueCodec.java @@ -25,9 +25,12 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; /** * Codec to encode/decode {@link KeyValue}s and {@link IndexedKeyValue}s within a {@link WALEdit} @@ -70,8 +73,16 @@ public class KeyValueCodec { int length = in.readInt(); // its a special IndexedKeyValue if (length == INDEX_TYPE_LENGTH_MARKER) { - IndexedKeyValue kv = new IndexedKeyValue(); - kv.readFields(in); + ImmutableBytesPtr indexTableName = new ImmutableBytesPtr(Bytes.readByteArray(in)); + byte[] mutationData = Bytes.readByteArray(in); + ClientProtos.MutationProto mProto = ClientProtos.MutationProto.parseFrom(mutationData); + Mutation mutation = org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(mProto); + IndexedKeyValue kv = null; + if (mutation != null){ + kv = IndexedKeyValue.newIndexedKeyValue(indexTableName.copyBytesIfNecessary(), mutation); + } else { + kv = new IndexedKeyValue(); + } return kv; } else { return KeyValue.create(length, in); diff --git a/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/IndexedKeyValueTest.java b/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/IndexedKeyValueTest.java index 7f34fcd..594ac3c 100644 --- a/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/IndexedKeyValueTest.java +++ b/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/IndexedKeyValueTest.java @@ -17,10 +17,10 @@ package org.apache.hadoop.hbase.regionserver.wal; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.phoenix.hbase.index.wal.IndexedKeyValue; import org.apache.phoenix.hbase.index.wal.KeyValueCodec; import org.junit.Assert; @@ -30,17 +30,45 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.IOException; public class IndexedKeyValueTest { + private static final byte[] ROW_KEY = Bytes.toBytes("foo"); + private static final byte[] FAMILY = Bytes.toBytes("family"); + private static final byte[] QUALIFIER = Bytes.toBytes("qualifier"); + private static final byte[] VALUE = Bytes.toBytes("value"); + private static final byte[] TABLE_NAME = Bytes.toBytes("MyTableName"); + + @Test + public void testIndexedKeyValueExceptionWhenMutationEmpty() throws IOException { + boolean caughtNullMutation = false, caughtNullEntry = false; + try { + IndexedKeyValue ikv = IndexedKeyValue.newIndexedKeyValue(TABLE_NAME, null); + } catch (IllegalArgumentException iae){ + caughtNullMutation = true; + } + try { + Mutation m = new Put(ROW_KEY); + IndexedKeyValue ikv = IndexedKeyValue.newIndexedKeyValue(TABLE_NAME, m); + } catch (IllegalArgumentException iae){ + caughtNullEntry = true; + } + //no need to test adding a mutation with a Cell with just a row key; HBase will put in + //a default cell with family byte[0], qualifier and value of "", and LATEST_TIMESTAMP + + Assert.assertTrue(caughtNullMutation & caughtNullEntry); + + } + @Test public void testIndexedKeyValuePopulatesKVFields() throws Exception { - byte[] row = Bytes.toBytes("foo"); - byte[] tableNameBytes = Bytes.toBytes("MyTableName"); - Mutation mutation = new Put(row); - IndexedKeyValue indexedKeyValue = new IndexedKeyValue(tableNameBytes, mutation); - testIndexedKeyValueHelper(indexedKeyValue, row, tableNameBytes, mutation); + byte[] row = (ROW_KEY); + Put mutation = new Put(row); + mutation.addColumn(FAMILY, QUALIFIER, VALUE); + IndexedKeyValue indexedKeyValue = IndexedKeyValue.newIndexedKeyValue(TABLE_NAME, mutation); + testIndexedKeyValueHelper(indexedKeyValue, row, TABLE_NAME, mutation); //now serialize the IndexedKeyValue and make sure the deserialized copy also //has all the right fields @@ -51,17 +79,16 @@ public class IndexedKeyValueTest { IndexedKeyValue deSerializedKV = (IndexedKeyValue) KeyValueCodec.readKeyValue(new DataInputStream( new ByteArrayInputStream(baos.toByteArray()))); - testIndexedKeyValueHelper(deSerializedKV, row, tableNameBytes, mutation); + testIndexedKeyValueHelper(deSerializedKV, row, TABLE_NAME, mutation); } - private void testIndexedKeyValueHelper(IndexedKeyValue indexedKeyValue, byte[] row, byte[] tableNameBytes, Mutation mutation) { - Assert.assertArrayEquals(row, indexedKeyValue.getRowArray()); - Assert.assertEquals(0, indexedKeyValue.getRowOffset()); - Assert.assertEquals(row.length, indexedKeyValue.getRowLength()); + private void testIndexedKeyValueHelper(IndexedKeyValue indexedKeyValue, byte[] row, + byte[] tableNameBytes, Mutation mutation) { + Assert.assertArrayEquals(row, CellUtil.cloneRow(indexedKeyValue)); Assert.assertArrayEquals(tableNameBytes, indexedKeyValue.getIndexTable()); Assert.assertEquals(mutation.toString(), indexedKeyValue.getMutation().toString()); - Assert.assertArrayEquals(WALEdit.METAFAMILY, indexedKeyValue.getFamilyArray()); + Assert.assertArrayEquals(WALEdit.METAFAMILY, CellUtil.cloneFamily(indexedKeyValue)); } } diff --git a/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ReadWriteKeyValuesWithCodecTest.java b/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ReadWriteKeyValuesWithCodecTest.java index 8bb491d..03082c0 100644 --- a/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ReadWriteKeyValuesWithCodecTest.java +++ b/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ReadWriteKeyValuesWithCodecTest.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; @@ -110,7 +111,8 @@ public class ReadWriteKeyValuesWithCodecTest { WALEdit justIndexUpdates = new WALEdit(); byte[] table = Bytes.toBytes("targetTable"); - IndexedKeyValue ikv = new IndexedKeyValue(table, p); + Cell c = CellUtil.createCell(p.getRow()); + IndexedKeyValue ikv = IndexedKeyValue.newIndexedKeyValue(table, p); justIndexUpdates.add(ikv); edits.add(justIndexUpdates);