HBASE-15077 Support OffheapKV write in compaction with out copying data on heap.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/da932ee3 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/da932ee3 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/da932ee3 Branch: refs/heads/trunk Commit: da932ee38d29a13acb3e3ff6653402d4c8141d04 Parents: ec47a81 Author: anoopsjohn <anoopsamj...@gmail.com> Authored: Tue Jan 12 10:02:39 2016 +0530 Committer: anoopsjohn <anoopsamj...@gmail.com> Committed: Tue Jan 12 10:02:39 2016 +0530 ---------------------------------------------------------------------- .../apache/hadoop/hbase/OffheapKeyValue.java | 2 +- .../hadoop/hbase/io/ByteArrayOutputStream.java | 129 +++++++++++++++++++ .../hadoop/hbase/io/ByteBufferOutputStream.java | 3 +- .../io/ByteBufferSupportDataOutputStream.java | 44 +++++++ .../hbase/io/ByteBufferSupportOutputStream.java | 51 ++++++++ .../io/encoding/BufferedDataBlockEncoder.java | 4 +- .../hadoop/hbase/util/ByteBufferUtils.java | 26 +--- .../hadoop/hbase/io/hfile/HFileBlock.java | 23 +--- .../io/encoding/TestDataBlockEncoders.java | 11 +- .../io/hfile/TestHFileDataBlockEncoder.java | 7 +- 10 files changed, 245 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/da932ee3/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java index 0af64cd..ced1595 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java @@ -254,7 +254,7 @@ public class OffheapKeyValue extends ByteBufferedCell implements HeapSize, Clone length = keyLen + this.getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE; } ByteBufferUtils.putInt(out, length); - ByteBufferUtils.writeByteBuffer(out, this.buf, this.offset, length); + ByteBufferUtils.copyBufferToStream(out, this.buf, this.offset, length); return length + Bytes.SIZEOF_INT; } http://git-wip-us.apache.org/repos/asf/hbase/blob/da932ee3/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java new file mode 100644 index 0000000..d951595 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Our own implementation of ByteArrayOutputStream where all methods are NOT synchronized and + * supports writing ByteBuffer directly to it. + */ +@InterfaceAudience.Private +public class ByteArrayOutputStream extends OutputStream implements ByteBufferSupportOutputStream { + + // Borrowed from openJDK: + // http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221 + private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + + private byte[] buf; + private int pos = 0; + + public ByteArrayOutputStream() { + this(32); + } + + public ByteArrayOutputStream(int capacity) { + this.buf = new byte[capacity]; + } + + @Override + public void write(ByteBuffer b, int off, int len) throws IOException { + checkSizeAndGrow(len); + ByteBufferUtils.copyFromBufferToArray(this.buf, b, off, this.pos, len); + this.pos += len; + } + + @Override + public void writeInt(int i) throws IOException { + checkSizeAndGrow(Bytes.SIZEOF_INT); + Bytes.putInt(this.buf, this.pos, i); + this.pos += Bytes.SIZEOF_INT; + } + + @Override + public void write(int b) throws IOException { + checkSizeAndGrow(Bytes.SIZEOF_BYTE); + buf[this.pos] = (byte) b; + this.pos++; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + checkSizeAndGrow(len); + System.arraycopy(b, off, this.buf, this.pos, len); + this.pos += len; + } + + private void checkSizeAndGrow(int extra) { + long capacityNeeded = this.pos + (long) extra; + if (capacityNeeded > this.buf.length) { + // guarantee it's possible to fit + if (capacityNeeded > MAX_ARRAY_SIZE) { + throw new BufferOverflowException(); + } + // double until hit the cap + long nextCapacity = Math.min(this.buf.length << 1, MAX_ARRAY_SIZE); + // but make sure there is enough if twice the existing capacity is still too small + nextCapacity = Math.max(nextCapacity, capacityNeeded); + if (nextCapacity > MAX_ARRAY_SIZE) { + throw new BufferOverflowException(); + } + byte[] newBuf = new byte[(int) nextCapacity]; + System.arraycopy(buf, 0, newBuf, 0, buf.length); + buf = newBuf; + } + } + + /** + * Resets the <code>pos</code> field of this byte array output stream to zero. The output stream + * can be used again. + */ + public void reset() { + this.pos = 0; + } + + /** + * Copies the content of this Stream into a new byte array. + * @return the contents of this output stream, as new byte array. + */ + public byte toByteArray()[] { + return Arrays.copyOf(buf, pos); + } + + /** + * @return the underlying array where the data gets accumulated + */ + public byte[] getBuffer() { + return this.buf; + } + + /** + * @return The current size of the buffer. + */ + public int size() { + return this.pos; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/da932ee3/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java index d91513e..d4bda18 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java @@ -37,7 +37,8 @@ import org.apache.hadoop.hbase.util.Bytes; */ @InterfaceAudience.Public @InterfaceStability.Evolving -public class ByteBufferOutputStream extends OutputStream { +public class ByteBufferOutputStream extends OutputStream + implements ByteBufferSupportOutputStream { // Borrowed from openJDK: // http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221 http://git-wip-us.apache.org/repos/asf/hbase/blob/da932ee3/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportDataOutputStream.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportDataOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportDataOutputStream.java new file mode 100644 index 0000000..3a52e63 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportDataOutputStream.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; + +/** + * Our extension of DataOutputStream which implements ByteBufferSupportOutputStream + */ +@InterfaceAudience.Private +public class ByteBufferSupportDataOutputStream extends DataOutputStream + implements ByteBufferSupportOutputStream { + + public ByteBufferSupportDataOutputStream(OutputStream out) { + super(out); + } + + @Override + public void write(ByteBuffer b, int off, int len) throws IOException { + ByteBufferUtils.copyBufferToStream(out, b, off, len); + written += len; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/da932ee3/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportOutputStream.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportOutputStream.java new file mode 100644 index 0000000..ccb5c81 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportOutputStream.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Interface adds support for writing {@link ByteBuffer} into OutputStream. + */ +@InterfaceAudience.Private +public interface ByteBufferSupportOutputStream { + + /** + * Writes <code>len</code> bytes from the specified ByteBuffer starting at offset <code>off</code> + * to this output stream. + * + * @param b the data. + * @param off the start offset in the data. + * @param len the number of bytes to write. + * @exception IOException + * if an I/O error occurs. In particular, an <code>IOException</code> is thrown if + * the output stream is closed. + */ + void write(ByteBuffer b, int off, int len) throws IOException; + + /** + * Writes an <code>int</code> to the underlying output stream as four + * bytes, high byte first. + * @param i the <code>int</code> to write + * @throws IOException if an I/O error occurs. + */ + void writeInt(int i) throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/da932ee3/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java index 112f258..33e38c7 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java @@ -673,14 +673,14 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { // Write key out.write(keyBuffer.array()); // Write value - ByteBufferUtils.writeByteBuffer(out, this.valueBuffer, this.valueOffset, this.valueLength); + ByteBufferUtils.copyBufferToStream(out, this.valueBuffer, this.valueOffset, this.valueLength); if (withTags) { // 2 bytes tags length followed by tags bytes // tags length is serialized with 2 bytes only(short way) even if the type is int. // As this is non -ve numbers, we save the sign bit. See HBASE-11437 out.write((byte) (0xff & (this.tagsLength >> 8))); out.write((byte) (0xff & this.tagsLength)); - ByteBufferUtils.writeByteBuffer(out, this.tagsBuffer, this.tagsOffset, this.tagsLength); + ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength); } return lenToWrite + Bytes.SIZEOF_INT; } http://git-wip-us.apache.org/repos/asf/hbase/blob/da932ee3/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java index 6e3fcaa..62173c2 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java @@ -27,7 +27,7 @@ import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.io.ByteBufferOutputStream; +import org.apache.hadoop.hbase.io.ByteBufferSupportOutputStream; import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.WritableUtils; @@ -141,8 +141,8 @@ public final class ByteBufferUtils { // We have writeInt in ByteBufferOutputStream so that it can directly write // int to underlying // ByteBuffer in one step. - if (out instanceof ByteBufferOutputStream) { - ((ByteBufferOutputStream) out).writeInt(value); + if (out instanceof ByteBufferSupportOutputStream) { + ((ByteBufferSupportOutputStream) out).writeInt(value); } else { StreamUtils.writeInt(out, value); } @@ -179,9 +179,10 @@ public final class ByteBufferUtils { */ public static void copyBufferToStream(OutputStream out, ByteBuffer in, int offset, int length) throws IOException { - if (in.hasArray()) { - out.write(in.array(), in.arrayOffset() + offset, - length); + if (out instanceof ByteBufferSupportOutputStream) { + ((ByteBufferSupportOutputStream) out).write(in, offset, length); + } else if (in.hasArray()) { + out.write(in.array(), in.arrayOffset() + offset, length); } else { for (int i = 0; i < length; ++i) { out.write(toByte(in, offset + i)); @@ -904,19 +905,6 @@ public final class ByteBufferUtils { } } - public static void writeByteBuffer(OutputStream out, ByteBuffer b, int offset, int length) - throws IOException { - // We have write which takes ByteBuffer in ByteBufferOutputStream so that it - // can directly write - // bytes from the src ByteBuffer to the destination ByteBuffer. This avoid - // need for temp array - // creation and copy - if (out instanceof ByteBufferOutputStream) { - ((ByteBufferOutputStream) out).write(b, offset, length); - } else { - ByteBufferUtils.copyBufferToStream(out, b, offset, length); - } - } // For testing purpose public static String toStringBinary(final ByteBuffer b, int off, int len) { StringBuilder result = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/hbase/blob/da932ee3/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index a68d0a6..6916ba0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.io.hfile; -import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutput; import java.io.DataOutputStream; @@ -35,7 +34,9 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; +import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.io.ByteBuffInputStream; +import org.apache.hadoop.hbase.io.ByteBufferSupportDataOutputStream; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; @@ -915,7 +916,7 @@ public class HFileBlock implements Cacheable { state = State.WRITING; // We will compress it later in finishBlock() - userDataStream = new DataOutputStream(baosInMemory); + userDataStream = new ByteBufferSupportDataOutputStream(baosInMemory); if (newBlockType == BlockType.DATA) { this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream); } @@ -969,11 +970,8 @@ public class HFileBlock implements Cacheable { */ private void finishBlock() throws IOException { if (blockType == BlockType.DATA) { - BufferGrabbingByteArrayOutputStream baosInMemoryCopy = - new BufferGrabbingByteArrayOutputStream(); - baosInMemory.writeTo(baosInMemoryCopy); this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream, - baosInMemoryCopy.buf, blockType); + baosInMemory.getBuffer(), blockType); blockType = dataBlockEncodingCtx.getBlockType(); } userDataStream.flush(); @@ -1011,19 +1009,6 @@ public class HFileBlock implements Cacheable { onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum()); } - public static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream { - private byte[] buf; - - @Override - public void write(byte[] b, int off, int len) { - this.buf = b; - } - - public byte[] getBuffer() { - return this.buf; - } - } - /** * Put the header into the given byte array at the given offset. * @param onDiskSize size of the block on disk header + data + checksum http://git-wip-us.apache.org/repos/asf/hbase/blob/da932ee3/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java index 00969b2..1ef918c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -45,8 +44,8 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeSeeker; +import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.hfile.HFileBlock.Writer.BufferGrabbingByteArrayOutputStream; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.nio.SingleByteBuff; @@ -256,9 +255,7 @@ public class TestDataBlockEncoders { for (KeyValue kv : kvs) { encoder.encode(kv, encodingContext, dos); } - BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream(); - baos.writeTo(stream); - encoder.endBlockEncoding(encodingContext, dos, stream.getBuffer()); + encoder.endBlockEncoding(encodingContext, dos, baos.getBuffer()); byte[] encodedData = new byte[baos.size() - ENCODED_DATA_OFFSET]; System.arraycopy(baos.toByteArray(), ENCODED_DATA_OFFSET, encodedData, 0, encodedData.length); if (useOffheapData) { @@ -398,9 +395,7 @@ public class TestDataBlockEncoders { for (KeyValue kv : kvList) { encoder.encode(kv, encodingContext, dos); } - BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream(); - baos.writeTo(stream); - encoder.endBlockEncoding(encodingContext, dos, stream.getBuffer()); + encoder.endBlockEncoding(encodingContext, dos, baos.getBuffer()); byte[] encodedData = baos.toByteArray(); testAlgorithm(encodedData, unencodedDataBuf, encoder); http://git-wip-us.apache.org/repos/asf/hbase/blob/da932ee3/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java index 2523a8c..6f434bb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java @@ -20,7 +20,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -31,12 +30,12 @@ import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; -import org.apache.hadoop.hbase.io.hfile.HFileBlock.Writer.BufferGrabbingByteArrayOutputStream; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.ChecksumType; @@ -217,9 +216,7 @@ public class TestHFileDataBlockEncoder { for (KeyValue kv : kvs) { blockEncoder.encode(kv, context, dos); } - BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream(); - baos.writeTo(stream); - blockEncoder.endBlockEncoding(context, dos, stream.getBuffer(), BlockType.DATA); + blockEncoder.endBlockEncoding(context, dos, baos.getBuffer(), BlockType.DATA); byte[] encodedBytes = baos.toByteArray(); size = encodedBytes.length - block.getDummyHeaderForVersion().length; return new HFileBlock(context.getBlockType(), size, size, -1, ByteBuffer.wrap(encodedBytes),