PARQUET-787: Limit read allocation size WIP: This update the `ParquetFileReader` to use multiple buffers when reading a row group, instead of a single humongous allocation. As a consequence, many classes needed to be updated to accept a stream backed by multiple buffers, instead of using a single buffer directly. Assuming a single contiguous buffer would require too many copies.
Author: Ryan Blue <b...@apache.org> Closes #390 from rdblue/PARQUET-787-limit-read-allocation-size and squashes the following commits: 4abba3e7a [Ryan Blue] PARQUET-787: Update byte buffer input streams for review comments. e7c6c5dd2 [Ryan Blue] PARQUET-787: Fix problems from Zoltan's review. be52b59fa [Ryan Blue] PARQUET-787: Update tests for both ByteBufferInputStreams. b0b614748 [Ryan Blue] PARQUET-787: Update encodings to use ByteBufferInputStream. a4fa05ac5 [Ryan Blue] Refactor ByteBufferInputStream implementations. 56b22a6a1 [Ryan Blue] Make allocation size configurable. 103ed3d86 [Ryan Blue] Add tests for ByteBufferInputStream and fix bugs. 614a2bbc8 [Ryan Blue] Limit allocation size to 8MB chunks for better garbage collection. Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/8bbc6cb9 Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/8bbc6cb9 Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/8bbc6cb9 Branch: refs/heads/master Commit: 8bbc6cb95fd9b4b9e86c924ca1e40fd555ecac1d Parents: ad80bfe Author: Ryan Blue <b...@apache.org> Authored: Wed Feb 21 09:40:07 2018 -0800 Committer: Ryan Blue <b...@apache.org> Committed: Wed Feb 21 09:40:07 2018 -0800 ---------------------------------------------------------------------- .../parquet/column/impl/ColumnReaderImpl.java | 30 +- .../parquet/column/values/ValuesReader.java | 36 +- .../bitpacking/BitPackingValuesReader.java | 15 +- .../bitpacking/ByteBitPackingValuesReader.java | 48 +- .../delta/DeltaBinaryPackingValuesReader.java | 40 +- .../DeltaLengthByteArrayValuesReader.java | 30 +- .../deltastrings/DeltaByteArrayReader.java | 11 +- .../dictionary/DictionaryValuesReader.java | 9 +- .../dictionary/PlainValuesDictionary.java | 17 +- .../values/plain/BinaryPlainValuesReader.java | 31 +- .../values/plain/BooleanPlainValuesReader.java | 16 +- .../FixedLenByteArrayPlainValuesReader.java | 29 +- .../column/values/plain/PlainValuesReader.java | 15 +- .../rle/RunLengthBitPackingHybridDecoder.java | 2 - .../RunLengthBitPackingHybridValuesReader.java | 19 +- .../values/rle/ZeroIntegerValuesReader.java | 11 +- .../column/impl/TestCorruptDeltaByteArrays.java | 17 +- .../org/apache/parquet/column/values/Utils.java | 21 +- .../values/bitpacking/BitPackingPerfTest.java | 3 +- .../values/bitpacking/TestBitPackingColumn.java | 3 +- ...BinaryPackingValuesWriterForIntegerTest.java | 17 +- ...ltaBinaryPackingValuesWriterForLongTest.java | 15 +- .../BenchmarkReadingRandomIntegers.java | 3 +- .../TestDeltaLengthByteArray.java | 6 +- .../BenchmarkDeltaLengthByteArray.java | 9 +- .../values/deltastrings/TestDeltaByteArray.java | 10 +- .../benchmark/BenchmarkDeltaByteArray.java | 17 +- .../values/dictionary/TestDictionary.java | 36 +- ...unLengthBitPackingHybridIntegrationTest.java | 2 +- .../TestRunLengthBitPackingHybridEncoder.java | 2 - .../parquet/bytes/ByteBufferInputStream.java | 86 ++- .../org/apache/parquet/bytes/BytesInput.java | 98 ++- .../parquet/bytes/MultiBufferInputStream.java | 382 ++++++++++++ .../parquet/bytes/SingleBufferInputStream.java | 177 ++++++ .../bytes/TestByteBufferInputStreams.java | 597 +++++++++++++++++++ .../bytes/TestMultiBufferInputStream.java | 141 +++++ .../bytes/TestSingleBufferInputStream.java | 130 ++++ .../org/apache/parquet/HadoopReadOptions.java | 9 +- .../org/apache/parquet/ParquetReadOptions.java | 50 +- .../org/apache/parquet/hadoop/CodecFactory.java | 2 +- .../parquet/hadoop/DirectCodecFactory.java | 4 +- .../parquet/hadoop/ParquetFileReader.java | 91 +-- .../parquet/hadoop/TestDirectCodecFactory.java | 6 +- 43 files changed, 1852 insertions(+), 441 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java index 931b4b1..8b47977 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java @@ -24,12 +24,11 @@ import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL; import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; import static org.apache.parquet.column.ValuesType.VALUES; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.parquet.CorruptDeltaByteArrays; import org.apache.parquet.VersionParser.ParsedVersion; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.ColumnDescriptor; @@ -549,7 +548,7 @@ public class ColumnReaderImpl implements ColumnReader { }); } - private void initDataReader(Encoding dataEncoding, ByteBuffer bytes, int offset, int valueCount) { + private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int valueCount) { ValuesReader previousReader = this.dataColumn; this.currentEncoding = dataEncoding; @@ -565,13 +564,15 @@ public class ColumnReaderImpl implements ColumnReader { } else { this.dataColumn = dataEncoding.getValuesReader(path, VALUES); } + if (dataEncoding.usesDictionary() && converter.hasDictionarySupport()) { bindToDictionary(dictionary); } else { bind(path.getType()); } + try { - dataColumn.initFromPage(pageValueCount, bytes, offset); + dataColumn.initFromPage(pageValueCount, in); } catch (IOException e) { throw new ParquetDecodingException("could not read page in col " + path, e); } @@ -589,16 +590,15 @@ public class ColumnReaderImpl implements ColumnReader { this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader); this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader); try { - ByteBuffer bytes = page.getBytes().toByteBuffer(); - LOG.debug("page size {} bytes and {} records", bytes.remaining(), pageValueCount); + BytesInput bytes = page.getBytes(); + LOG.debug("page size {} bytes and {} records", bytes.size(), pageValueCount); LOG.debug("reading repetition levels at 0"); - rlReader.initFromPage(pageValueCount, bytes, 0); - int next = rlReader.getNextOffset(); - LOG.debug("reading definition levels at {}", next); - dlReader.initFromPage(pageValueCount, bytes, next); - next = dlReader.getNextOffset(); - LOG.debug("reading data at {}", next); - initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount()); + ByteBufferInputStream in = bytes.toInputStream(); + rlReader.initFromPage(pageValueCount, in); + LOG.debug("reading definition levels at {}", in.position()); + dlReader.initFromPage(pageValueCount, in); + LOG.debug("reading data at {}", in.position()); + initDataReader(page.getValueEncoding(), in, page.getValueCount()); } catch (IOException e) { throw new ParquetDecodingException("could not read page " + page + " in col " + path, e); } @@ -607,9 +607,9 @@ public class ColumnReaderImpl implements ColumnReader { private void readPageV2(DataPageV2 page) { this.repetitionLevelColumn = newRLEIterator(path.getMaxRepetitionLevel(), page.getRepetitionLevels()); this.definitionLevelColumn = newRLEIterator(path.getMaxDefinitionLevel(), page.getDefinitionLevels()); + LOG.debug("page data size {} bytes and {} records", page.getData().size(), pageValueCount); try { - LOG.debug("page data size {} bytes and {} records", page.getData().size(), pageValueCount); - initDataReader(page.getDataEncoding(), page.getData().toByteBuffer(), 0, page.getValueCount()); + initDataReader(page.getDataEncoding(), page.getData().toInputStream(), page.getValueCount()); } catch (IOException e) { throw new ParquetDecodingException("could not read page " + page + " in col " + path, e); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java index 03aa2f8..b2ec2a5 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java @@ -20,8 +20,7 @@ package org.apache.parquet.column.values; import java.io.IOException; -import java.nio.ByteBuffer; -import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.io.api.Binary; /** @@ -40,8 +39,9 @@ public abstract class ValuesReader { /** * Called to initialize the column reader from a part of a page. * - * The underlying implementation knows how much data to read, so a length - * is not provided. + * Implementations must consume all bytes from the input stream, leaving the + * stream ready to read the next section of data. The underlying + * implementation knows how much data to read, so a length is not provided. * * Each page may contain several sections: * <ul> @@ -50,36 +50,12 @@ public abstract class ValuesReader { * <li> data column * </ul> * - * This function is called with 'offset' pointing to the beginning of one of these sections, - * and should return the offset to the section following it. - * * @param valueCount count of values in this page - * @param page the array to read from containing the page data (repetition levels, definition levels, data) - * @param offset where to start reading from in the page + * @param in an input stream containing the page data at the correct offset * * @throws IOException */ - public abstract void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException; - - /** - * Same functionality as method of the same name that takes a ByteBuffer instead of a byte[]. - * - * This method is only provided for backward compatibility and will be removed in a future release. - * Please update any code using it as soon as possible. - * @see #initFromPage(int, ByteBuffer, int) - */ - @Deprecated - public void initFromPage(int valueCount, byte[] page, int offset) throws IOException { - this.initFromPage(valueCount, ByteBuffer.wrap(page), offset); - } - - /** - * Called to return offset of the next section - * @return offset of the next section - */ - public int getNextOffset() { - throw new ParquetDecodingException("Unsupported: cannot get offset of the next section."); - } + public abstract void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException; /** * usable when the encoding is dictionary based http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java index a5608cb..bcc828b 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java @@ -22,7 +22,6 @@ import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt; import static org.apache.parquet.column.values.bitpacking.BitPacking.createBitPackingReader; import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesUtils; @@ -44,7 +43,6 @@ public class BitPackingValuesReader extends ValuesReader { private ByteBufferInputStream in; private BitPackingReader bitPackingReader; private final int bitsPerValue; - private int nextOffset; /** * @param bound the maximum value stored by this column @@ -68,21 +66,16 @@ public class BitPackingValuesReader extends ValuesReader { /** * {@inheritDoc} - * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBuffer, int) + * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBufferInputStream) */ @Override - public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException { + public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { int effectiveBitLength = valueCount * bitsPerValue; int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength); LOG.debug("reading {} bytes for {} values of size {} bits.", length, valueCount, bitsPerValue); - this.in = new ByteBufferInputStream(in, offset, length); - this.bitPackingReader = createBitPackingReader(bitsPerValue, this.in, valueCount); - this.nextOffset = offset + length; - } - @Override - public int getNextOffset() { - return nextOffset; + this.in = stream.sliceStream(length); + this.bitPackingReader = createBitPackingReader(bitsPerValue, this.in, valueCount); } @Override http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java index 7c19340..0445d25 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java @@ -19,11 +19,12 @@ package org.apache.parquet.column.values.bitpacking; import java.io.IOException; -import java.util.Arrays; import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.io.ParquetDecodingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,9 +37,7 @@ public class ByteBitPackingValuesReader extends ValuesReader { private final BytePacker packer; private final int[] decoded = new int[VALUES_AT_A_TIME]; private int decodedPosition = VALUES_AT_A_TIME - 1; - private ByteBuffer encoded; - private int encodedPos; - private int nextOffset; + private ByteBufferInputStream in; public ByteBitPackingValuesReader(int bound, Packer packer) { this.bitWidth = BytesUtils.getWidthFromMaxInt(bound); @@ -49,37 +48,38 @@ public class ByteBitPackingValuesReader extends ValuesReader { public int readInteger() { ++ decodedPosition; if (decodedPosition == decoded.length) { - encoded.position(encodedPos); - if (encodedPos + bitWidth > encoded.limit()) { - // unpack8Values needs at least bitWidth bytes to read from, - // We have to fill in 0 byte at the end of encoded bytes. - byte[] tempEncode = new byte[bitWidth]; - encoded.get(tempEncode, 0, encoded.limit() - encodedPos); - packer.unpack8Values(tempEncode, 0, decoded, 0); - } else { - packer.unpack8Values(encoded, encodedPos, decoded, 0); + try { + if (in.available() < bitWidth) { + // unpack8Values needs at least bitWidth bytes to read from, + // We have to fill in 0 byte at the end of encoded bytes. + byte[] tempEncode = new byte[bitWidth]; + in.read(tempEncode, 0, in.available()); + packer.unpack8Values(tempEncode, 0, decoded, 0); + } else { + ByteBuffer encoded = in.slice(bitWidth); + packer.unpack8Values(encoded, encoded.position(), decoded, 0); + } + } catch (IOException e) { + throw new ParquetDecodingException("Failed to read packed values", e); } - encodedPos += bitWidth; decodedPosition = 0; } return decoded[decodedPosition]; } @Override - public void initFromPage(int valueCount, ByteBuffer page, int offset) + public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { int effectiveBitLength = valueCount * bitWidth; int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength); // ceil - LOG.debug("reading {} bytes for {} values of size {} bits.", length, valueCount, bitWidth); - this.encoded = page; - this.encodedPos = offset; + LOG.debug("reading {} bytes for {} values of size {} bits.", + length, valueCount, bitWidth); + // work-around for null values. this will not happen for repetition or + // definition levels (never null), but will happen when valueCount has not + // been adjusted for null values in the data. + length = Math.min(length, stream.available()); + this.in = stream.sliceStream(length); this.decodedPosition = VALUES_AT_A_TIME - 1; - this.nextOffset = offset + length; - } - - @Override - public int getNextOffset() { - return nextOffset; } @Override http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java index a3355d2..bf53846 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java @@ -18,7 +18,6 @@ */ package org.apache.parquet.column.values.delta; -import java.io.ByteArrayInputStream; import java.io.IOException; import org.apache.parquet.bytes.ByteBufferInputStream; @@ -28,7 +27,6 @@ import org.apache.parquet.column.values.bitpacking.BytePackerForLong; import org.apache.parquet.column.values.bitpacking.Packer; import org.apache.parquet.io.ParquetDecodingException; -import java.io.IOException; import java.nio.ByteBuffer; /** @@ -43,7 +41,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader { */ private int valuesRead; private long minDeltaInCurrentBlock; - private ByteBuffer page; + /** * stores the decoded values including the first value which is written to the header */ @@ -54,23 +52,16 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader { */ private int valuesBuffered; private ByteBufferInputStream in; - private int nextOffset; private DeltaBinaryPackingConfig config; private int[] bitWidths; /** - * eagerly load all the data into memory - * - * @param valueCount count of values in this page - * @param page the array to read from containing the page data (repetition levels, definition levels, data) - * @param offset where to start reading from in the page - * @throws IOException + * eagerly loads all the data into memory */ @Override - public void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException { - in = new ByteBufferInputStream(page, offset, page.limit() - offset); + public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { + this.in = stream; this.config = DeltaBinaryPackingConfig.readConfig(in); - this.page = page; this.totalValueCount = BytesUtils.readUnsignedVarInt(in); allocateValuesBuffer(); bitWidths = new int[config.miniBlockNumInABlock]; @@ -81,14 +72,8 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader { while (valuesBuffered < totalValueCount) { //values Buffered could be more than totalValueCount, since we flush on a mini block basis loadNewBlockToBuffer(); } - this.nextOffset = page.limit() - in.available(); } - - @Override - public int getNextOffset() { - return nextOffset; - } - + /** * the value buffer is allocated so that the size of it is multiple of mini block * because when writing, data is flushed on a mini block basis @@ -123,7 +108,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader { } } - private void loadNewBlockToBuffer() { + private void loadNewBlockToBuffer() throws IOException { try { minDeltaInCurrentBlock = BytesUtils.readZigZagVarLong(in); } catch (IOException e) { @@ -152,19 +137,18 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader { * * @param packer the packer created from bitwidth of current mini block */ - private void unpackMiniBlock(BytePackerForLong packer) { + private void unpackMiniBlock(BytePackerForLong packer) throws IOException { for (int j = 0; j < config.miniBlockSizeInValues; j += 8) { unpack8Values(packer); } } - private void unpack8Values(BytePackerForLong packer) { - //calculate the pos because the packer api uses array not stream - int pos = page.limit() - in.available(); - packer.unpack8Values(page, pos, valuesBuffer, valuesBuffered); + private void unpack8Values(BytePackerForLong packer) throws IOException { + // get a single buffer of 8 values. most of the time, this won't require a copy + // TODO: update the packer to consume from an InputStream + ByteBuffer buffer = in.slice(packer.getBitWidth()); + packer.unpack8Values(buffer, buffer.position(), valuesBuffer, valuesBuffered); this.valuesBuffered += 8; - //sync the pos in stream - in.skip(packer.getBitWidth()); } private void readBitWidthsForMiniBlocks() { http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java index d810ba8..e6ee1fd 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java @@ -22,8 +22,10 @@ package org.apache.parquet.column.values.deltalengthbytearray; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader; +import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,34 +40,38 @@ public class DeltaLengthByteArrayValuesReader extends ValuesReader { private static final Logger LOG = LoggerFactory.getLogger(DeltaLengthByteArrayValuesReader.class); private ValuesReader lengthReader; - private ByteBuffer in; - private int offset; + private ByteBufferInputStream in; public DeltaLengthByteArrayValuesReader() { this.lengthReader = new DeltaBinaryPackingValuesReader(); } @Override - public void initFromPage(int valueCount, ByteBuffer in, int offset) + public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { - LOG.debug("init from page at offset {} for length {}", offset, (in.limit() - offset)); - lengthReader.initFromPage(valueCount, in, offset); - offset = lengthReader.getNextOffset(); - this.in = in; - this.offset = offset; + LOG.debug("init from page at offset {} for length {}", + stream.position(), stream.available()); + lengthReader.initFromPage(valueCount, stream); + this.in = stream.remainingStream(); } @Override public Binary readBytes() { int length = lengthReader.readInteger(); - int start = offset; - offset = start + length; - return Binary.fromConstantByteBuffer(in, start, length); + try { + return Binary.fromConstantByteBuffer(in.slice(length)); + } catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes"); + } } @Override public void skip() { int length = lengthReader.readInteger(); - offset = offset + length; + try { + in.skipFully(length); + } catch (IOException e) { + throw new ParquetDecodingException("Failed to skip " + length + " bytes"); + } } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java index 742b515..7a01627 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java @@ -21,6 +21,8 @@ package org.apache.parquet.column.values.deltastrings; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.values.RequiresPreviousReader; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader; @@ -46,13 +48,12 @@ public class DeltaByteArrayReader extends ValuesReader implements RequiresPrevio } @Override - public void initFromPage(int valueCount, ByteBuffer page, int offset) + public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { - prefixLengthReader.initFromPage(valueCount, page, offset); - int next = prefixLengthReader.getNextOffset(); - suffixReader.initFromPage(valueCount, page, next); + prefixLengthReader.initFromPage(valueCount, stream); + suffixReader.initFromPage(valueCount, stream); } - + @Override public void skip() { // read the next value to skip so that previous is correct. http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java index 19ff47c..87edda6 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java @@ -52,11 +52,12 @@ public class DictionaryValuesReader extends ValuesReader { } @Override - public void initFromPage(int valueCount, ByteBuffer page, int offset) + public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { - this.in = new ByteBufferInputStream(page, offset, page.limit() - offset); - if (page.limit() - offset > 0) { - LOG.debug("init from page at offset {} for length {}", offset, (page.limit() - offset)); + this.in = stream.remainingStream(); + if (in.available() > 0) { + LOG.debug("init from page at offset {} for length {}", + stream.position(), stream.available()); int bitWidth = BytesUtils.readIntLittleEndianOnOneByte(in); LOG.debug("bit width {}", bitWidth); decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java index 0fa6cc6..0b8beb2 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.parquet.Preconditions; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.Dictionary; import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.values.plain.PlainValuesReader.DoublePlainValuesReader; @@ -150,10 +151,10 @@ public abstract class PlainValuesDictionary extends Dictionary { */ public PlainLongDictionary(DictionaryPage dictionaryPage) throws IOException { super(dictionaryPage); - final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer(); + ByteBufferInputStream in = dictionaryPage.getBytes().toInputStream(); longDictionaryContent = new long[dictionaryPage.getDictionarySize()]; LongPlainValuesReader longReader = new LongPlainValuesReader(); - longReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, dictionaryByteBuf.position()); + longReader.initFromPage(dictionaryPage.getDictionarySize(), in); for (int i = 0; i < longDictionaryContent.length; i++) { longDictionaryContent[i] = longReader.readLong(); } @@ -193,10 +194,10 @@ public abstract class PlainValuesDictionary extends Dictionary { */ public PlainDoubleDictionary(DictionaryPage dictionaryPage) throws IOException { super(dictionaryPage); - final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer(); + ByteBufferInputStream in = dictionaryPage.getBytes().toInputStream(); doubleDictionaryContent = new double[dictionaryPage.getDictionarySize()]; DoublePlainValuesReader doubleReader = new DoublePlainValuesReader(); - doubleReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, 0); + doubleReader.initFromPage(dictionaryPage.getDictionarySize(), in); for (int i = 0; i < doubleDictionaryContent.length; i++) { doubleDictionaryContent[i] = doubleReader.readDouble(); } @@ -236,10 +237,10 @@ public abstract class PlainValuesDictionary extends Dictionary { */ public PlainIntegerDictionary(DictionaryPage dictionaryPage) throws IOException { super(dictionaryPage); - final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer(); + ByteBufferInputStream in = dictionaryPage.getBytes().toInputStream(); intDictionaryContent = new int[dictionaryPage.getDictionarySize()]; IntegerPlainValuesReader intReader = new IntegerPlainValuesReader(); - intReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, 0); + intReader.initFromPage(dictionaryPage.getDictionarySize(), in); for (int i = 0; i < intDictionaryContent.length; i++) { intDictionaryContent[i] = intReader.readInteger(); } @@ -279,10 +280,10 @@ public abstract class PlainValuesDictionary extends Dictionary { */ public PlainFloatDictionary(DictionaryPage dictionaryPage) throws IOException { super(dictionaryPage); - final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer(); + ByteBufferInputStream in = dictionaryPage.getBytes().toInputStream(); floatDictionaryContent = new float[dictionaryPage.getDictionarySize()]; FloatPlainValuesReader floatReader = new FloatPlainValuesReader(); - floatReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, dictionaryByteBuf.position()); + floatReader.initFromPage(dictionaryPage.getDictionarySize(), in); for (int i = 0; i < floatDictionaryContent.length; i++) { floatDictionaryContent[i] = floatReader.readFloat(); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java index 82e5551..6411325 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java @@ -20,8 +20,8 @@ package org.apache.parquet.column.values.plain; import java.io.IOException; -import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.io.ParquetDecodingException; @@ -31,40 +31,37 @@ import org.slf4j.LoggerFactory; public class BinaryPlainValuesReader extends ValuesReader { private static final Logger LOG = LoggerFactory.getLogger(BinaryPlainValuesReader.class); - private ByteBuffer in; - private int offset; + private ByteBufferInputStream in; @Override public Binary readBytes() { try { - int length = BytesUtils.readIntLittleEndian(in, offset); - int start = offset + 4; - offset = start + length; - return Binary.fromConstantByteBuffer(in, start, length); + int length = BytesUtils.readIntLittleEndian(in); + return Binary.fromConstantByteBuffer(in.slice(length)); } catch (IOException e) { - throw new ParquetDecodingException("could not read bytes at offset " + offset, e); + throw new ParquetDecodingException("could not read bytes at offset " + in.position(), e); } catch (RuntimeException e) { - throw new ParquetDecodingException("could not read bytes at offset " + offset, e); + throw new ParquetDecodingException("could not read bytes at offset " + in.position(), e); } } @Override public void skip() { try { - int length = BytesUtils.readIntLittleEndian(in, offset); - offset += 4 + length; + int length = BytesUtils.readIntLittleEndian(in); + in.skipFully(length); } catch (IOException e) { - throw new ParquetDecodingException("could not skip bytes at offset " + offset, e); + throw new ParquetDecodingException("could not skip bytes at offset " + in.position(), e); } catch (RuntimeException e) { - throw new ParquetDecodingException("could not skip bytes at offset " + offset, e); + throw new ParquetDecodingException("could not skip bytes at offset " + in.position(), e); } } @Override - public void initFromPage(int valueCount, ByteBuffer in, int offset) + public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { - LOG.debug("init from page at offset {} for length {}", offset, (in.limit() - offset)); - this.in = in; - this.offset = offset; + LOG.debug("init from page at offset {} for length {}", + stream.position(), (stream.available() - stream.position())); + this.in = stream.remainingStream(); } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java index 1f8fc2c..3296daa 100755 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java @@ -21,8 +21,8 @@ package org.apache.parquet.column.values.plain; import static org.apache.parquet.column.values.bitpacking.Packer.LITTLE_ENDIAN; import java.io.IOException; -import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesReader; import org.slf4j.Logger; @@ -60,17 +60,11 @@ public class BooleanPlainValuesReader extends ValuesReader { /** * {@inheritDoc} - * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int valueCount, ByteBuffer page, int offset) + * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBufferInputStream) */ @Override - public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException { - LOG.debug("init from page at offset {} for length {}", offset, (in.limit() - offset)); - this.in.initFromPage(valueCount, in, offset); + public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { + LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available()); + this.in.initFromPage(valueCount, stream); } - - @Override - public int getNextOffset() { - return this.in.getNextOffset(); - } - } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java index 7a14f81..7738de7 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java @@ -20,6 +20,7 @@ package org.apache.parquet.column.values.plain; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; @@ -33,9 +34,9 @@ import org.slf4j.LoggerFactory; */ public class FixedLenByteArrayPlainValuesReader extends ValuesReader { private static final Logger LOG = LoggerFactory.getLogger(FixedLenByteArrayPlainValuesReader.class); - private ByteBuffer in; - private int offset; - private int length; + + private final int length; + private ByteBufferInputStream in; public FixedLenByteArrayPlainValuesReader(int length) { this.length = length; @@ -44,24 +45,26 @@ public class FixedLenByteArrayPlainValuesReader extends ValuesReader { @Override public Binary readBytes() { try { - int start = offset; - offset = start + length; - return Binary.fromConstantByteBuffer(in, start, length); - } catch (RuntimeException e) { - throw new ParquetDecodingException("could not read bytes at offset " + offset, e); + return Binary.fromConstantByteBuffer(in.slice(length)); + } catch (IOException | RuntimeException e) { + throw new ParquetDecodingException("could not read bytes at offset " + in.position(), e); } } @Override public void skip() { - offset += length; + try { + in.skipFully(length); + } catch (IOException | RuntimeException e) { + throw new ParquetDecodingException("could not skip bytes at offset " + in.position(), e); + } } @Override - public void initFromPage(int valueCount, ByteBuffer in, int offset) + public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { - LOG.debug("init from page at offset {} for length {}", offset, (in.limit() - offset)); - this.in = in; - this.offset = offset; + LOG.debug("init from page at offset {} for length {}", + stream.position(), stream.available()); + this.in = stream.remainingStream(); } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java index e79cbb2..726f611 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java @@ -19,7 +19,6 @@ package org.apache.parquet.column.values.plain; import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.LittleEndianDataInputStream; @@ -39,18 +38,10 @@ abstract public class PlainValuesReader extends ValuesReader { protected LittleEndianDataInputStream in; - /** - * {@inheritDoc} - * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBuffer, int) - */ @Override - public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException { - LOG.debug("init from page at offset {} for length {}", offset , (in.limit() - offset)); - this.in = new LittleEndianDataInputStream(toInputStream(in, offset)); - } - - private ByteBufferInputStream toInputStream(ByteBuffer in, int offset) { - return new ByteBufferInputStream(in.duplicate(), offset, in.limit() - offset); + public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { + LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available()); + this.in = new LittleEndianDataInputStream(stream.remainingStream()); } public static class DoublePlainValuesReader extends PlainValuesReader { http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java index 6daa349..d682a98 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java @@ -22,9 +22,7 @@ package org.apache.parquet.column.values.rle; import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; -import java.nio.ByteBuffer; -import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.bitpacking.BytePacker; http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java index 4ccf2b8..ebfa76d 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java @@ -19,7 +19,6 @@ package org.apache.parquet.column.values.rle; import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesUtils; @@ -35,26 +34,16 @@ import org.apache.parquet.io.ParquetDecodingException; public class RunLengthBitPackingHybridValuesReader extends ValuesReader { private final int bitWidth; private RunLengthBitPackingHybridDecoder decoder; - private int nextOffset; public RunLengthBitPackingHybridValuesReader(int bitWidth) { this.bitWidth = bitWidth; } @Override - public void initFromPage(int valueCountL, ByteBuffer page, int offset) throws IOException { - ByteBufferInputStream in = new ByteBufferInputStream(page, offset, page.limit() - offset); - int length = BytesUtils.readIntLittleEndian(in); - - decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in); - - // 4 is for the length which is stored as 4 bytes little endian - this.nextOffset = offset + length + 4; - } - - @Override - public int getNextOffset() { - return this.nextOffset; + public void initFromPage(int valueCountL, ByteBufferInputStream stream) throws IOException { + int length = BytesUtils.readIntLittleEndian(stream); + this.decoder = new RunLengthBitPackingHybridDecoder( + bitWidth, stream.sliceStream(length)); } @Override http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java index f8ff8d0..fe00de9 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java @@ -21,6 +21,7 @@ package org.apache.parquet.column.values.rle; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.values.ValuesReader; /** @@ -30,20 +31,12 @@ import org.apache.parquet.column.values.ValuesReader; */ public class ZeroIntegerValuesReader extends ValuesReader { - private int nextOffset; - public int readInteger() { return 0; } @Override - public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException { - this.nextOffset = offset; - } - - @Override - public int getNextOffset() { - return nextOffset; + public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { } @Override http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java index 1f39d95..5bcbb88 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java @@ -21,6 +21,7 @@ package org.apache.parquet.column.impl; import org.apache.parquet.CorruptDeltaByteArrays; import org.apache.parquet.SemanticVersion; import org.apache.parquet.VersionParser.ParsedVersion; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; @@ -100,13 +101,13 @@ public class TestCorruptDeltaByteArrays { ByteBuffer corruptPageBytes = writer.getBytes().toByteBuffer(); DeltaByteArrayReader firstPageReader = new DeltaByteArrayReader(); - firstPageReader.initFromPage(10, firstPageBytes, 0); + firstPageReader.initFromPage(10, ByteBufferInputStream.wrap(firstPageBytes)); for (int i = 0; i < 10; i += 1) { - assertEquals(firstPageReader.readBytes().toStringUsingUTF8(), str(i)); + assertEquals(str(i), firstPageReader.readBytes().toStringUsingUTF8()); } DeltaByteArrayReader corruptPageReader = new DeltaByteArrayReader(); - corruptPageReader.initFromPage(10, corruptPageBytes, 0); + corruptPageReader.initFromPage(10, ByteBufferInputStream.wrap(corruptPageBytes)); try { corruptPageReader.readBytes(); fail("Corrupt page did not throw an exception when read"); @@ -115,7 +116,7 @@ public class TestCorruptDeltaByteArrays { } DeltaByteArrayReader secondPageReader = new DeltaByteArrayReader(); - secondPageReader.initFromPage(10, corruptPageBytes, 0); + secondPageReader.initFromPage(10, ByteBufferInputStream.wrap(corruptPageBytes)); secondPageReader.setPreviousReader(firstPageReader); for (int i = 10; i < 20; i += 1) { @@ -140,13 +141,13 @@ public class TestCorruptDeltaByteArrays { ByteBuffer secondPageBytes = writer.getBytes().toByteBuffer(); DeltaByteArrayReader firstPageReader = new DeltaByteArrayReader(); - firstPageReader.initFromPage(10, firstPageBytes, 0); + firstPageReader.initFromPage(10, ByteBufferInputStream.wrap(firstPageBytes)); for (int i = 0; i < 10; i += 1) { assertEquals(firstPageReader.readBytes().toStringUsingUTF8(), str(i)); } DeltaByteArrayReader secondPageReader = new DeltaByteArrayReader(); - secondPageReader.initFromPage(10, secondPageBytes, 0); + secondPageReader.initFromPage(10, ByteBufferInputStream.wrap(secondPageBytes)); secondPageReader.setPreviousReader(firstPageReader); for (int i = 10; i < 20; i += 1) { @@ -171,13 +172,13 @@ public class TestCorruptDeltaByteArrays { ByteBuffer secondPageBytes = writer.getBytes().toByteBuffer(); DeltaByteArrayReader firstPageReader = new DeltaByteArrayReader(); - firstPageReader.initFromPage(10, firstPageBytes, 0); + firstPageReader.initFromPage(10, ByteBufferInputStream.wrap(firstPageBytes)); for (int i = 0; i < 10; i += 1) { assertEquals(firstPageReader.readBytes().toStringUsingUTF8(), str(i)); } DeltaByteArrayReader secondPageReader = new DeltaByteArrayReader(); - secondPageReader.initFromPage(10, secondPageBytes, 0); + secondPageReader.initFromPage(10, ByteBufferInputStream.wrap(secondPageBytes)); for (int i = 10; i < 20; i += 1) { assertEquals(secondPageReader.readBytes().toStringUsingUTF8(), str(i)); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java b/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java index 8caad2b..248e039 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Random; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.io.api.Binary; /** @@ -59,33 +60,23 @@ public class Utils { } } - public static Binary[] readData(ValuesReader reader, byte[] data, int offset, int length) + public static Binary[] readData(ValuesReader reader, ByteBufferInputStream stream, int length) throws IOException { Binary[] bins = new Binary[length]; - reader.initFromPage(length, ByteBuffer.wrap(data), 0); + reader.initFromPage(length, stream); for(int i=0; i < length; i++) { bins[i] = reader.readBytes(); } return bins; } - - public static Binary[] readData(ValuesReader reader, byte[] data, int length) - throws IOException { - return readData(reader, data, 0, length); - } - - public static int[] readInts(ValuesReader reader, byte[] data, int offset, int length) + + public static int[] readInts(ValuesReader reader, ByteBufferInputStream stream, int length) throws IOException { int[] ints = new int[length]; - reader.initFromPage(length, ByteBuffer.wrap(data), offset); + reader.initFromPage(length, stream); for(int i=0; i < length; i++) { ints[i] = reader.readInteger(); } return ints; } - - public static int[] readInts(ValuesReader reader, byte[] data, int length) - throws IOException { - return readInts(reader, data, 0, length); - } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java index 2733b72..656623c 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java @@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.bitpacking.BitPacking.BitPackingWriter; @@ -88,7 +89,7 @@ public class BitPackingPerfTest { System.out.print(" no gc <"); for (int k = 0; k < N; k++) { long t2 = System.nanoTime(); - r.initFromPage(result.length, ByteBuffer.wrap(bytes), 0); + r.initFromPage(result.length, ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes))); for (int i = 0; i < result.length; i++) { result[i] = r.readInteger(); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java index d83628a..867af28 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java @@ -25,6 +25,7 @@ import static org.apache.parquet.column.values.bitpacking.Packer.BIG_ENDIAN; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.junit.Test; import org.apache.parquet.bytes.DirectByteBufferAllocator; @@ -175,7 +176,7 @@ public class TestBitPackingColumn { LOG.debug("bytes: {}", TestBitPacking.toString(bytes)); assertEquals(type.toString(), expected, TestBitPacking.toString(bytes)); ValuesReader r = type.getReader(bound); - r.initFromPage(vals.length, ByteBuffer.wrap(bytes), 0); + r.initFromPage(vals.length, ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes))); int[] result = new int[vals.length]; for (int i = 0; i < result.length; i++) { result[i] = r.readInteger(); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java index a3bec4a..ff4a308 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Random; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.junit.Before; import org.junit.Test; @@ -143,7 +144,7 @@ public class DeltaBinaryPackingValuesWriterForIntegerTest { } @Test - public void shouldReturnCorrectOffsetAfterInitialization() throws IOException { + public void shouldConsumePageDataInInitialization() throws IOException { int[] data = new int[2 * blockSize + 3]; for (int i = 0; i < data.length; i++) { data[i] = i * 32; @@ -157,12 +158,14 @@ public class DeltaBinaryPackingValuesWriterForIntegerTest { int contentOffsetInPage = 33; System.arraycopy(valueContent, 0, pageContent, contentOffsetInPage, valueContent.length); - //offset should be correct - reader.initFromPage(100, ByteBuffer.wrap(pageContent), contentOffsetInPage); - int offset= reader.getNextOffset(); + // offset should be correct + ByteBufferInputStream stream = ByteBufferInputStream.wrap(ByteBuffer.wrap(pageContent)); + stream.skipFully(contentOffsetInPage); + reader.initFromPage(100, stream); + long offset = stream.position(); assertEquals(valueContent.length + contentOffsetInPage, offset); - //should be able to read data correclty + // should be able to read data correctly for (int i : data) { assertEquals(i, reader.readInteger()); } @@ -191,7 +194,7 @@ public class DeltaBinaryPackingValuesWriterForIntegerTest { } writeData(data); reader = new DeltaBinaryPackingValuesReader(); - reader.initFromPage(100, writer.getBytes().toByteBuffer(), 0); + reader.initFromPage(100, writer.getBytes().toInputStream()); for (int i = 0; i < data.length; i++) { if (i % 3 == 0) { reader.skip(); @@ -247,7 +250,7 @@ public class DeltaBinaryPackingValuesWriterForIntegerTest { + blockFlushed * miniBlockNum //bitWidth of mini blocks + (5.0 * blockFlushed);//min delta for each block assertTrue(estimatedSize >= page.length); - reader.initFromPage(100, ByteBuffer.wrap(page), 0); + reader.initFromPage(100, ByteBufferInputStream.wrap(ByteBuffer.wrap(page))); for (int i = 0; i < length; i++) { assertEquals(data[i], reader.readInteger()); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java index 34e1800..795a591 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Random; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.junit.Before; import org.junit.Test; @@ -157,12 +158,14 @@ public class DeltaBinaryPackingValuesWriterForLongTest { int contentOffsetInPage = 33; System.arraycopy(valueContent, 0, pageContent, contentOffsetInPage, valueContent.length); - //offset should be correct - reader.initFromPage(100, ByteBuffer.wrap(pageContent), contentOffsetInPage); - int offset = reader.getNextOffset(); + // offset should be correct + ByteBufferInputStream stream = ByteBufferInputStream.wrap(ByteBuffer.wrap(pageContent)); + stream.skipFully(contentOffsetInPage); + reader.initFromPage(100, stream); + long offset = stream.position(); assertEquals(valueContent.length + contentOffsetInPage, offset); - //should be able to read data correclty + // should be able to read data correctly for (long i : data) { assertEquals(i, reader.readLong()); } @@ -190,7 +193,7 @@ public class DeltaBinaryPackingValuesWriterForLongTest { } writeData(data); reader = new DeltaBinaryPackingValuesReader(); - reader.initFromPage(100, writer.getBytes().toByteBuffer(), 0); + reader.initFromPage(100, writer.getBytes().toInputStream()); for (int i = 0; i < data.length; i++) { if (i % 3 == 0) { reader.skip(); @@ -244,7 +247,7 @@ public class DeltaBinaryPackingValuesWriterForLongTest { + blockFlushed * miniBlockNum //bitWidth of mini blocks + (10.0 * blockFlushed);//min delta for each block assertTrue(estimatedSize >= page.length); - reader.initFromPage(100, page, 0); + reader.initFromPage(100, ByteBufferInputStream.wrap(ByteBuffer.wrap(page))); for (int i = 0; i < length; i++) { assertEquals(data[i], reader.readLong()); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java index 488208c..ba5d771 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Random; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.ValuesWriter; @@ -91,7 +92,7 @@ public class BenchmarkReadingRandomIntegers { } private void readData(ValuesReader reader, byte[] deltaBytes) throws IOException { - reader.initFromPage(data.length, ByteBuffer.wrap(deltaBytes), 0); + reader.initFromPage(data.length, ByteBufferInputStream.wrap(ByteBuffer.wrap(deltaBytes))); for (int i = 0; i < data.length; i++) { reader.readInteger(); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java index d7ebee5..d214a88 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java @@ -43,7 +43,7 @@ public class TestDeltaLengthByteArray { DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader(); Utils.writeData(writer, values); - Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), values.length); + Binary[] bin = Utils.readData(reader, writer.getBytes().toInputStream(), values.length); for(int i =0; i< bin.length ; i++) { Assert.assertEquals(Binary.fromString(values[i]), bin[i]); @@ -57,7 +57,7 @@ public class TestDeltaLengthByteArray { String[] values = Utils.getRandomStringSamples(1000, 32); Utils.writeData(writer, values); - Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), values.length); + Binary[] bin = Utils.readData(reader, writer.getBytes().toInputStream(), values.length); for(int i =0; i< bin.length ; i++) { Assert.assertEquals(Binary.fromString(values[i]), bin[i]); @@ -70,7 +70,7 @@ public class TestDeltaLengthByteArray { ValuesReader reader = new DeltaBinaryPackingValuesReader(); Utils.writeData(writer, values); - int[] bin = Utils.readInts(reader, writer.getBytes().toByteArray(), values.length); + int[] bin = Utils.readInts(reader, writer.getBytes().toInputStream(), values.length); for(int i =0; i< bin.length ; i++) { Assert.assertEquals(values[i].length(), bin[i]); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java index 69c5e15..08d04e6 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java @@ -20,6 +20,7 @@ package org.apache.parquet.column.values.deltalengthbytearray.benchmark; import java.io.IOException; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.junit.Rule; import org.junit.Test; @@ -52,9 +53,9 @@ public class BenchmarkDeltaLengthByteArray { BinaryPlainValuesReader reader = new BinaryPlainValuesReader(); Utils.writeData(writer, values); - byte [] data = writer.getBytes().toByteArray(); + ByteBufferInputStream data = writer.getBytes().toInputStream(); Binary[] bin = Utils.readData(reader, data, values.length); - System.out.println("size " + data.length); + System.out.println("size " + data.position()); } @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @@ -64,9 +65,9 @@ public class BenchmarkDeltaLengthByteArray { DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader(); Utils.writeData(writer, values); - byte [] data = writer.getBytes().toByteArray(); + ByteBufferInputStream data = writer.getBytes().toInputStream(); Binary[] bin = Utils.readData(reader, data, values.length); - System.out.println("size " + data.length); + System.out.println("size " + data.position()); } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java index 4f8f40c..c13a3a2 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java @@ -21,6 +21,7 @@ package org.apache.parquet.column.values.deltastrings; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.junit.Test; import org.junit.Assert; @@ -63,7 +64,7 @@ public class TestDeltaByteArray { ValuesReader reader = new DeltaBinaryPackingValuesReader(); Utils.writeData(writer, values); - byte[] data = writer.getBytes().toByteArray(); + ByteBufferInputStream data = writer.getBytes().toInputStream(); int[] bin = Utils.readInts(reader, data, values.length); // test prefix lengths @@ -71,9 +72,8 @@ public class TestDeltaByteArray { Assert.assertEquals(7, bin[1]); Assert.assertEquals(7, bin[2]); - int offset = reader.getNextOffset(); reader = new DeltaBinaryPackingValuesReader(); - bin = Utils.readInts(reader, writer.getBytes().toByteArray(), offset, values.length); + bin = Utils.readInts(reader, data, values.length); // test suffix lengths Assert.assertEquals(10, bin[0]); Assert.assertEquals(0, bin[1]); @@ -82,7 +82,7 @@ public class TestDeltaByteArray { private void assertReadWrite(DeltaByteArrayWriter writer, DeltaByteArrayReader reader, String[] vals) throws Exception { Utils.writeData(writer, vals); - Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), vals.length); + Binary[] bin = Utils.readData(reader, writer.getBytes().toInputStream(), vals.length); for(int i = 0; i< bin.length ; i++) { Assert.assertEquals(Binary.fromString(vals[i]), bin[i]); @@ -92,7 +92,7 @@ public class TestDeltaByteArray { private void assertReadWriteWithSkip(DeltaByteArrayWriter writer, DeltaByteArrayReader reader, String[] vals) throws Exception { Utils.writeData(writer, vals); - reader.initFromPage(vals.length, writer.getBytes().toByteBuffer(), 0); + reader.initFromPage(vals.length, writer.getBytes().toInputStream()); for (int i = 0; i < vals.length; i += 2) { Assert.assertEquals(Binary.fromString(vals[i]), reader.readBytes()); reader.skip(); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java index eac4bd2..53578f0 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java @@ -21,6 +21,7 @@ package org.apache.parquet.column.values.deltastrings.benchmark; import java.io.IOException; import java.util.Arrays; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.junit.Rule; import org.junit.Test; @@ -59,9 +60,9 @@ public class BenchmarkDeltaByteArray { BinaryPlainValuesReader reader = new BinaryPlainValuesReader(); Utils.writeData(writer, values); - byte [] data = writer.getBytes().toByteArray(); + ByteBufferInputStream data = writer.getBytes().toInputStream(); Binary[] bin = Utils.readData(reader, data, values.length); - System.out.println("size " + data.length); + System.out.println("size " + data.position()); } @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @@ -71,9 +72,9 @@ public class BenchmarkDeltaByteArray { DeltaByteArrayReader reader = new DeltaByteArrayReader(); Utils.writeData(writer, values); - byte [] data = writer.getBytes().toByteArray(); + ByteBufferInputStream data = writer.getBytes().toInputStream(); Binary[] bin = Utils.readData(reader, data, values.length); - System.out.println("size " + data.length); + System.out.println("size " + data.position()); } @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @@ -83,9 +84,9 @@ public class BenchmarkDeltaByteArray { BinaryPlainValuesReader reader = new BinaryPlainValuesReader(); Utils.writeData(writer, sortedVals); - byte [] data = writer.getBytes().toByteArray(); + ByteBufferInputStream data = writer.getBytes().toInputStream(); Binary[] bin = Utils.readData(reader, data, values.length); - System.out.println("size " + data.length); + System.out.println("size " + data.position()); } @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @@ -95,8 +96,8 @@ public class BenchmarkDeltaByteArray { DeltaByteArrayReader reader = new DeltaByteArrayReader(); Utils.writeData(writer, sortedVals); - byte [] data = writer.getBytes().toByteArray(); + ByteBufferInputStream data = writer.getBytes().toInputStream(); Binary[] bin = Utils.readData(reader, data, values.length); - System.out.println("size " + data.length); + System.out.println("size " + data.position()); } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java index ada1c93..cf66982 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.junit.Assert; import org.junit.Test; @@ -118,7 +119,7 @@ public class TestDictionary { //Fallbacked to Plain encoding, therefore use PlainValuesReader to read it back ValuesReader reader = new BinaryPlainValuesReader(); - reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0); + reader.initFromPage(100, cw.getBytes().toInputStream()); for (long i = 0; i < 100; i++) { assertEquals(Binary.fromString("str" + i), reader.readBytes()); @@ -204,13 +205,13 @@ public class TestDictionary { DictionaryValuesReader cr = initDicReader(cw, PrimitiveTypeName.INT64); - cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0); + cr.initFromPage(COUNT, bytes1.toInputStream()); for (long i = 0; i < COUNT; i++) { long back = cr.readLong(); assertEquals(i % 50, back); } - cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0); + cr.initFromPage(COUNT2, bytes2.toInputStream()); for (long i = COUNT2; i > 0; i--) { long back = cr.readLong(); assertEquals(i % 50, back); @@ -228,7 +229,7 @@ public class TestDictionary { } } - reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0); + reader.initFromPage(100, cw.getBytes().toInputStream()); for (long i = 0; i < 100; i++) { assertEquals(i, reader.readLong()); @@ -274,13 +275,13 @@ public class TestDictionary { final DictionaryValuesReader cr = initDicReader(cw, DOUBLE); - cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0); + cr.initFromPage(COUNT, bytes1.toInputStream()); for (double i = 0; i < COUNT; i++) { double back = cr.readDouble(); assertEquals(i % 50, back, 0.0); } - cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0); + cr.initFromPage(COUNT2, bytes2.toInputStream()); for (double i = COUNT2; i > 0; i--) { double back = cr.readDouble(); assertEquals(i % 50, back, 0.0); @@ -299,7 +300,7 @@ public class TestDictionary { } } - reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0); + reader.initFromPage(100, cw.getBytes().toInputStream()); for (double i = 0; i < 100; i++) { assertEquals(i, reader.readDouble(), 0.00001); @@ -345,13 +346,13 @@ public class TestDictionary { DictionaryValuesReader cr = initDicReader(cw, INT32); - cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0); + cr.initFromPage(COUNT, bytes1.toInputStream()); for (int i = 0; i < COUNT; i++) { int back = cr.readInteger(); assertEquals(i % 50, back); } - cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0); + cr.initFromPage(COUNT2, bytes2.toInputStream()); for (int i = COUNT2; i > 0; i--) { int back = cr.readInteger(); assertEquals(i % 50, back); @@ -370,7 +371,7 @@ public class TestDictionary { } } - reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0); + reader.initFromPage(100, cw.getBytes().toInputStream()); for (int i = 0; i < 100; i++) { assertEquals(i, reader.readInteger()); @@ -416,13 +417,13 @@ public class TestDictionary { DictionaryValuesReader cr = initDicReader(cw, FLOAT); - cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0); + cr.initFromPage(COUNT, bytes1.toInputStream()); for (float i = 0; i < COUNT; i++) { float back = cr.readFloat(); assertEquals(i % 50, back, 0.0f); } - cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0); + cr.initFromPage(COUNT2, bytes2.toInputStream()); for (float i = COUNT2; i > 0; i--) { float back = cr.readFloat(); assertEquals(i % 50, back, 0.0f); @@ -441,7 +442,7 @@ public class TestDictionary { } } - reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0); + reader.initFromPage(100, cw.getBytes().toInputStream()); for (float i = 0; i < 100; i++) { assertEquals(i, reader.readFloat(), 0.00001); @@ -476,8 +477,9 @@ public class TestDictionary { // pretend there are 100 nulls. what matters is offset = bytes.length. ByteBuffer bytes = ByteBuffer.wrap(new byte[] {0x00, 0x01, 0x02, 0x03}); // data doesn't matter - int offset = bytes.remaining(); - reader.initFromPage(100, bytes, offset); + ByteBufferInputStream stream = ByteBufferInputStream.wrap(bytes); + stream.skipFully(stream.available()); + reader.initFromPage(100, stream); } private DictionaryValuesReader initDicReader(ValuesWriter cw, PrimitiveTypeName type) @@ -490,14 +492,14 @@ public class TestDictionary { } private void checkDistinct(int COUNT, BytesInput bytes, ValuesReader cr, String prefix) throws IOException { - cr.initFromPage(COUNT, bytes.toByteBuffer(), 0); + cr.initFromPage(COUNT, bytes.toInputStream()); for (int i = 0; i < COUNT; i++) { Assert.assertEquals(prefix + i, cr.readBytes().toStringUsingUTF8()); } } private void checkRepeated(int COUNT, BytesInput bytes, ValuesReader cr, String prefix) throws IOException { - cr.initFromPage(COUNT, bytes.toByteBuffer(), 0); + cr.initFromPage(COUNT, bytes.toInputStream()); for (int i = 0; i < COUNT; i++) { Assert.assertEquals(prefix + i % 10, cr.readBytes().toStringUsingUTF8()); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java index 712fb27..173d8fa 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java @@ -72,7 +72,7 @@ public class RunLengthBitPackingHybridIntegrationTest { numValues += 1000; ByteBuffer encodedBytes = encoder.toBytes().toByteBuffer(); - ByteBufferInputStream in = new ByteBufferInputStream(encodedBytes); + ByteBufferInputStream in = ByteBufferInputStream.wrap(encodedBytes); RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java index 5696d7b..dd329c0 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java @@ -21,14 +21,12 @@ package org.apache.parquet.column.values.rle; import static org.junit.Assert.assertEquals; import java.io.ByteArrayInputStream; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.junit.Test; -import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.bitpacking.BytePacker; http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java index 5b3b853..fc92b6b 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java @@ -16,67 +16,57 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.parquet.bytes; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; -/** - * This ByteBufferInputStream does not consume the ByteBuffer being passed in, - * but will create a slice of the current buffer. - */ -public class ByteBufferInputStream extends InputStream { - - protected ByteBuffer byteBuf; - protected int initPos; - protected int count; - public ByteBufferInputStream(ByteBuffer buffer) { - this(buffer, buffer.position(), buffer.remaining()); - } - - public ByteBufferInputStream(ByteBuffer buffer, int offset, int count) { - ByteBuffer temp = buffer.duplicate(); - temp.position(offset); - byteBuf = temp.slice(); - byteBuf.limit(count); - this.initPos = offset; - this.count = count; - } - - public ByteBuffer toByteBuffer() { - return byteBuf.slice(); +public abstract class ByteBufferInputStream extends InputStream { + + public static ByteBufferInputStream wrap(ByteBuffer... buffers) { + if (buffers.length == 1) { + return new SingleBufferInputStream(buffers[0]); + } else { + return new MultiBufferInputStream(Arrays.asList(buffers)); + } } - - @Override - public int read() throws IOException { - if (!byteBuf.hasRemaining()) { - return -1; + + public static ByteBufferInputStream wrap(List<ByteBuffer> buffers) { + if (buffers.size() == 1) { + return new SingleBufferInputStream(buffers.get(0)); + } else { + return new MultiBufferInputStream(buffers); } - //Workaround for unsigned byte - return byteBuf.get() & 0xFF; } - @Override - public int read(byte[] bytes, int offset, int length) throws IOException { - int count = Math.min(byteBuf.remaining(), length); - if (count == 0) return -1; - byteBuf.get(bytes, offset, count); - return count; + public abstract long position(); + + public void skipFully(long n) throws IOException { + long skipped = skip(n); + if (skipped < n) { + throw new EOFException( + "Not enough bytes to skip: " + skipped + " < " + n); + } } - - @Override - public long skip(long n) { - if (n > byteBuf.remaining()) - n = byteBuf.remaining(); - int pos = byteBuf.position(); - byteBuf.position((int)(pos + n)); - return n; + + public abstract int read(ByteBuffer out); + + public abstract ByteBuffer slice(int length) throws EOFException; + + public abstract List<ByteBuffer> sliceBuffers(long length) throws EOFException; + + public ByteBufferInputStream sliceStream(long length) throws EOFException { + return ByteBufferInputStream.wrap(sliceBuffers(length)); } + public abstract List<ByteBuffer> remainingBuffers(); - @Override - public int available() { - return byteBuf.remaining(); + public ByteBufferInputStream remainingStream() { + return ByteBufferInputStream.wrap(remainingBuffers()); } }