HIVE-13255: FloatTreeReader.nextVector is expensive (Prasanth Jayachandran reviewed by Gopal V)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8225cb6a Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8225cb6a Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8225cb6a Branch: refs/heads/llap Commit: 8225cb6aedba7e49515da44f092405994f9a22b6 Parents: 4008845 Author: Prasanth Jayachandran <prasan...@apache.org> Authored: Thu Mar 31 02:48:01 2016 -0700 Committer: Prasanth Jayachandran <prasan...@apache.org> Committed: Thu Mar 31 02:48:01 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/orc/impl/IntegerReader.java | 4 +- .../apache/orc/impl/RunLengthIntegerReader.java | 7 +-- .../orc/impl/RunLengthIntegerReaderV2.java | 7 +-- .../org/apache/orc/impl/SerializationUtils.java | 34 ++++++++++----- .../apache/orc/impl/TestSerializationUtils.java | 45 +++++++++++++++++-- .../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 16 +++---- .../hive/ql/io/orc/TreeReaderFactory.java | 46 ++++++++++---------- 7 files changed, 99 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/8225cb6a/orc/src/java/org/apache/orc/impl/IntegerReader.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/IntegerReader.java b/orc/src/java/org/apache/orc/impl/IntegerReader.java index b928559..7dfd289 100644 --- a/orc/src/java/org/apache/orc/impl/IntegerReader.java +++ b/orc/src/java/org/apache/orc/impl/IntegerReader.java @@ -60,8 +60,6 @@ public interface IntegerReader { * @return * @throws IOException */ - void nextVector(LongColumnVector previous, long previousLen) + void nextVector(LongColumnVector previous, final int previousLen) throws IOException; - - void setInStream(InStream data); } http://git-wip-us.apache.org/repos/asf/hive/blob/8225cb6a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java index f129c86..0c90cde 100644 --- a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java +++ b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java @@ -99,7 +99,7 @@ public class RunLengthIntegerReader implements IntegerReader { } @Override - public void nextVector(LongColumnVector previous, long previousLen) throws IOException { + public void nextVector(LongColumnVector previous, final int previousLen) throws IOException { previous.isRepeating = true; for (int i = 0; i < previousLen; i++) { if (!previous.isNull[i]) { @@ -122,11 +122,6 @@ public class RunLengthIntegerReader implements IntegerReader { } @Override - public void setInStream(InStream data) { - input = data; - } - - @Override public void seek(PositionProvider index) throws IOException { input.seek(index); int consumed = (int) index.getNext(); http://git-wip-us.apache.org/repos/asf/hive/blob/8225cb6a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java index 5f2a673..c6d685a 100644 --- a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java +++ b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java @@ -360,7 +360,7 @@ public class RunLengthIntegerReaderV2 implements IntegerReader { } @Override - public void nextVector(LongColumnVector previous, long previousLen) throws IOException { + public void nextVector(LongColumnVector previous, final int previousLen) throws IOException { previous.isRepeating = true; for (int i = 0; i < previousLen; i++) { if (!previous.isNull[i]) { @@ -382,9 +382,4 @@ public class RunLengthIntegerReaderV2 implements IntegerReader { } } } - - @Override - public void setInStream(InStream data) { - input = data; - } } http://git-wip-us.apache.org/repos/asf/hive/blob/8225cb6a/orc/src/java/org/apache/orc/impl/SerializationUtils.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/SerializationUtils.java b/orc/src/java/org/apache/orc/impl/SerializationUtils.java index c1162e4..2e5a59b 100644 --- a/orc/src/java/org/apache/orc/impl/SerializationUtils.java +++ b/orc/src/java/org/apache/orc/impl/SerializationUtils.java @@ -18,8 +18,6 @@ package org.apache.orc.impl; -import org.apache.orc.impl.InStream; - import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -77,18 +75,22 @@ public final class SerializationUtils { } public float readFloat(InputStream in) throws IOException { - int ser = in.read() | (in.read() << 8) | (in.read() << 16) | - (in.read() << 24); - return Float.intBitsToFloat(ser); + readFully(in, readBuffer, 0, 4); + int val = (((readBuffer[0] & 0xff) << 0) + + ((readBuffer[1] & 0xff) << 8) + + ((readBuffer[2] & 0xff) << 16) + + ((readBuffer[3] & 0xff) << 24)); + return Float.intBitsToFloat(val); } public void writeFloat(OutputStream output, float value) throws IOException { int ser = Float.floatToIntBits(value); - output.write(ser & 0xff); - output.write((ser >> 8) & 0xff); - output.write((ser >> 16) & 0xff); - output.write((ser >> 24) & 0xff); + writeBuffer[0] = (byte) ((ser >> 0) & 0xff); + writeBuffer[1] = (byte) ((ser >> 8) & 0xff); + writeBuffer[2] = (byte) ((ser >> 16) & 0xff); + writeBuffer[3] = (byte) ((ser >> 24) & 0xff); + output.write(writeBuffer, 0, 4); } public double readDouble(InputStream in) throws IOException { @@ -96,7 +98,7 @@ public final class SerializationUtils { } public long readLongLE(InputStream in) throws IOException { - in.read(readBuffer, 0, 8); + readFully(in, readBuffer, 0, 8); return (((readBuffer[0] & 0xff) << 0) + ((readBuffer[1] & 0xff) << 8) + ((readBuffer[2] & 0xff) << 16) @@ -107,6 +109,18 @@ public final class SerializationUtils { + ((long) (readBuffer[7] & 0xff) << 56)); } + private void readFully(final InputStream in, final byte[] buffer, final int off, final int len) + throws IOException { + int n = 0; + while (n < len) { + int count = in.read(buffer, off + n, len - n); + if (count < 0) { + throw new EOFException("Read past EOF for " + in); + } + n += count; + } + } + public void writeDouble(OutputStream output, double value) throws IOException { writeLongLE(output, Double.doubleToLongBits(value)); http://git-wip-us.apache.org/repos/asf/hive/blob/8225cb6a/orc/src/test/org/apache/orc/impl/TestSerializationUtils.java ---------------------------------------------------------------------- diff --git a/orc/src/test/org/apache/orc/impl/TestSerializationUtils.java b/orc/src/test/org/apache/orc/impl/TestSerializationUtils.java index 0785412..4a8a0f2 100644 --- a/orc/src/test/org/apache/orc/impl/TestSerializationUtils.java +++ b/orc/src/test/org/apache/orc/impl/TestSerializationUtils.java @@ -25,6 +25,9 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; import org.junit.Test; @@ -156,9 +159,43 @@ public class TestSerializationUtils { assertEquals(Long.MIN_VALUE, LongMath.checkedSubtract(Long.MIN_VALUE, 0)); } - public static void main(String[] args) throws Exception { - TestSerializationUtils test = new TestSerializationUtils(); - test.testDoubles(); - test.testBigIntegers(); + @Test + public void testRandomFloats() throws Exception { + float tolerance = 0.0000000000000001f; + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + SerializationUtils utils = new SerializationUtils(); + Random rand = new Random(); + int n = 100_000; + float[] expected = new float[n]; + for (int i = 0; i < n; i++) { + float f = rand.nextFloat(); + expected[i] = f; + utils.writeFloat(buffer, f); + } + InputStream newBuffer = fromBuffer(buffer); + for (int i = 0; i < n; i++) { + float got = utils.readFloat(newBuffer); + assertEquals(expected[i], got, tolerance); + } + } + + @Test + public void testRandomDoubles() throws Exception { + double tolerance = 0.0000000000000001; + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + SerializationUtils utils = new SerializationUtils(); + Random rand = new Random(); + int n = 100_000; + double[] expected = new double[n]; + for (int i = 0; i < n; i++) { + double d = rand.nextDouble(); + expected[i] = d; + utils.writeDouble(buffer, d); + } + InputStream newBuffer = fromBuffer(buffer); + for (int i = 0; i < n; i++) { + double got = utils.readDouble(newBuffer); + assertEquals(expected[i], got, tolerance); + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/8225cb6a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index aa835ae..3975409 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -1060,7 +1060,7 @@ public class RecordReaderImpl implements RecordReader { readStripe(); } - long batchSize = computeBatchSize(VectorizedRowBatch.DEFAULT_SIZE); + final int batchSize = computeBatchSize(VectorizedRowBatch.DEFAULT_SIZE); rowInStripe += batchSize; if (previous == null) { @@ -1068,13 +1068,13 @@ public class RecordReaderImpl implements RecordReader { result = new VectorizedRowBatch(cols.length); result.cols = cols; } else { - result = (VectorizedRowBatch) previous; + result = previous; result.selectedInUse = false; reader.setVectorColumnCount(result.getDataColumnCount()); - reader.nextVector(result.cols, (int) batchSize); + reader.nextVector(result.cols, batchSize); } - result.size = (int) batchSize; + result.size = batchSize; advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true); return result; } catch (IOException e) { @@ -1083,8 +1083,8 @@ public class RecordReaderImpl implements RecordReader { } } - private long computeBatchSize(long targetBatchSize) { - long batchSize = 0; + private int computeBatchSize(long targetBatchSize) { + final int batchSize; // In case of PPD, batch size should be aware of row group boundaries. If only a subset of row // groups are selected then marker position is set to the end of range (subset of row groups // within strip). Batch size computed out of marker position makes sure that batch size is @@ -1106,13 +1106,13 @@ public class RecordReaderImpl implements RecordReader { final long markerPosition = (endRowGroup * rowIndexStride) < rowCountInStripe ? (endRowGroup * rowIndexStride) : rowCountInStripe; - batchSize = Math.min(targetBatchSize, (markerPosition - rowInStripe)); + batchSize = (int) Math.min(targetBatchSize, (markerPosition - rowInStripe)); if (isLogDebugEnabled && batchSize < targetBatchSize) { LOG.debug("markerPosition: " + markerPosition + " batchSize: " + batchSize); } } else { - batchSize = Math.min(targetBatchSize, (rowCountInStripe - rowInStripe)); + batchSize = (int) Math.min(targetBatchSize, (rowCountInStripe - rowInStripe)); } return batchSize; } http://git-wip-us.apache.org/repos/asf/hive/blob/8225cb6a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java index 620ad53..d74a854 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java @@ -239,7 +239,7 @@ public class TreeReaderFactory { * @return next column vector * @throws IOException */ - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { ColumnVector result = (ColumnVector) previousVector; if (present != null) { // Set noNulls and isNull vector of the ColumnVector based on @@ -322,7 +322,7 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final LongColumnVector result; if (previousVector == null) { result = new LongColumnVector(); @@ -387,7 +387,7 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final LongColumnVector result; if (previousVector == null) { result = new LongColumnVector(); @@ -473,7 +473,7 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final LongColumnVector result; if (previousVector == null) { result = new LongColumnVector(); @@ -559,7 +559,7 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final LongColumnVector result; if (previousVector == null) { result = new LongColumnVector(); @@ -646,7 +646,7 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final LongColumnVector result; if (previousVector == null) { result = new LongColumnVector(); @@ -719,7 +719,7 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final DoubleColumnVector result; if (previousVector == null) { result = new DoubleColumnVector(); @@ -832,7 +832,7 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final DoubleColumnVector result; if (previousVector == null) { result = new DoubleColumnVector(); @@ -974,7 +974,7 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final BytesColumnVector result; if (previousVector == null) { result = new BytesColumnVector(); @@ -1144,7 +1144,7 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final TimestampColumnVector result; if (previousVector == null) { result = new TimestampColumnVector(); @@ -1253,7 +1253,7 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final LongColumnVector result; if (previousVector == null) { result = new LongColumnVector(); @@ -1352,7 +1352,7 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final DecimalColumnVector result; if (previousVector == null) { result = new DecimalColumnVector(precision, scale); @@ -1481,7 +1481,7 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { return reader.nextVector(previousVector, batchSize); } @@ -1498,7 +1498,7 @@ public class TreeReaderFactory { private static byte[] commonReadByteArrays(InStream stream, IntegerReader lengths, LongColumnVector scratchlcv, - BytesColumnVector result, long batchSize) throws IOException { + BytesColumnVector result, final int batchSize) throws IOException { // Read lengths scratchlcv.isNull = result.isNull; // Notice we are replacing the isNull vector here... lengths.nextVector(scratchlcv, batchSize); @@ -1534,7 +1534,7 @@ public class TreeReaderFactory { // This method has the common code for reading in bytes into a BytesColumnVector. public static void readOrcByteArrays(InStream stream, IntegerReader lengths, LongColumnVector scratchlcv, - BytesColumnVector result, long batchSize) throws IOException { + BytesColumnVector result, final int batchSize) throws IOException { byte[] allBytes = commonReadByteArrays(stream, lengths, scratchlcv, result, batchSize); @@ -1641,7 +1641,7 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final BytesColumnVector result; if (previousVector == null) { result = new BytesColumnVector(); @@ -1815,7 +1815,7 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final BytesColumnVector result; int offset; int length; @@ -1926,7 +1926,7 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { // Get the vector of strings from StringTreeReader, then make a 2nd pass to // adjust down the length (right trim and truncate) if necessary. BytesColumnVector result = (BytesColumnVector) super.nextVector(previousVector, batchSize); @@ -2000,7 +2000,7 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { // Get the vector of strings from StringTreeReader, then make a 2nd pass to // adjust down the length (truncate) if necessary. BytesColumnVector result = (BytesColumnVector) super.nextVector(previousVector, batchSize); @@ -2137,7 +2137,7 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { final ColumnVector[] result; if (previousVector == null) { result = new ColumnVector[readColumnCount]; @@ -2242,7 +2242,7 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, long batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { throw new UnsupportedOperationException( "NextVector is not supported operation for Union type"); } @@ -2325,7 +2325,7 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previous, long batchSize) throws IOException { + public Object nextVector(Object previous, final int batchSize) throws IOException { throw new UnsupportedOperationException( "NextVector is not supported operation for List type"); } @@ -2419,7 +2419,7 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previous, long batchSize) throws IOException { + public Object nextVector(Object previous, final int batchSize) throws IOException { throw new UnsupportedOperationException( "NextVector is not supported operation for Map type"); }