HIVE-13878: Vectorization: Column pruning for Text vectorization (Matt McCline, reviewed by Gopal Vijayaraghavan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0b62e6f3 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0b62e6f3 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0b62e6f3 Branch: refs/heads/master Commit: 0b62e6f38788de81816abacf025d61bbc80d75fa Parents: ff67cdd Author: Matt McCline <mmccl...@hortonworks.com> Authored: Tue Sep 13 23:15:56 2016 -0700 Committer: Matt McCline <mmccl...@hortonworks.com> Committed: Tue Sep 13 23:15:56 2016 -0700 ---------------------------------------------------------------------- .../ql/exec/vector/VectorDeserializeRow.java | 238 +++--- .../hive/ql/exec/vector/VectorMapOperator.java | 22 +- .../fast/VectorMapJoinFastLongHashTable.java | 2 +- .../fast/VectorMapJoinFastStringCommon.java | 2 +- .../VectorMapJoinOptimizedLongCommon.java | 56 -- .../VectorMapJoinOptimizedStringCommon.java | 26 - .../hive/ql/optimizer/physical/Vectorizer.java | 25 +- .../hive/ql/exec/vector/TestVectorSerDeRow.java | 14 +- .../mapjoin/fast/CheckFastRowHashMap.java | 10 +- .../exec/vector/mapjoin/fast/VerifyFastRow.java | 2 +- .../fast/BinarySortableDeserializeRead.java | 132 ++-- .../hive/serde2/fast/DeserializeRead.java | 71 +- .../lazy/fast/LazySimpleDeserializeRead.java | 770 ++++++++++--------- .../fast/LazyBinaryDeserializeRead.java | 119 +-- .../apache/hadoop/hive/serde2/VerifyFast.java | 2 +- .../binarysortable/TestBinarySortableFast.java | 35 +- .../hive/serde2/lazy/TestLazySimpleFast.java | 31 +- .../serde2/lazybinary/TestLazyBinaryFast.java | 32 +- 18 files changed, 787 insertions(+), 802 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java index 47bef43..d31d338 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.vector; import java.io.EOFException; import java.io.IOException; +import java.util.Arrays; import java.util.List; import org.slf4j.Logger; @@ -97,20 +98,27 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { * We say "source" because when there is conversion we are converting th deserialized source into * a target data type. */ - boolean[] isConvert; + + private boolean useReadField; + // True when the (random access) readField method of DeserializeRead are being used. + + private int[] readFieldLogicalIndices; + // The logical indices for reading with readField. + + private boolean[] isConvert; // For each column, are we converting the row column? - int[] projectionColumnNums; + private int[] projectionColumnNums; // Assigning can be a subset of columns, so this is the projection -- // the batch column numbers. - Category[] sourceCategories; + private Category[] sourceCategories; // The data type category of each column being deserialized. - PrimitiveCategory[] sourcePrimitiveCategories; + private PrimitiveCategory[] sourcePrimitiveCategories; //The data type primitive category of each column being deserialized. - int[] maxLengths; + private int[] maxLengths; // For the CHAR and VARCHAR data types, the maximum character length of // the columns. Otherwise, 0. @@ -131,6 +139,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { private void allocateArrays(int count) { isConvert = new boolean[count]; projectionColumnNums = new int[count]; + Arrays.fill(projectionColumnNums, -1); sourceCategories = new Category[count]; sourcePrimitiveCategories = new PrimitiveCategory[count]; maxLengths = new int[count]; @@ -231,14 +240,18 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { public void init(boolean[] columnsToIncludeTruncated) throws HiveException { - if (columnsToIncludeTruncated != null) { - deserializeRead.setColumnsToInclude(columnsToIncludeTruncated); - } + // When truncated included is used, its length must be at least the number of source type infos. + // When longer, we assume the caller will default with nulls, etc. + Preconditions.checkState( + columnsToIncludeTruncated == null || + columnsToIncludeTruncated.length == sourceTypeInfos.length); - final int columnCount = (columnsToIncludeTruncated == null ? - sourceTypeInfos.length : columnsToIncludeTruncated.length); + final int columnCount = sourceTypeInfos.length; allocateArrays(columnCount); + int includedCount = 0; + int[] includedIndices = new int[columnCount]; + for (int i = 0; i < columnCount; i++) { if (columnsToIncludeTruncated != null && !columnsToIncludeTruncated[i]) { @@ -248,9 +261,16 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { } else { initSourceEntry(i, i, sourceTypeInfos[i]); - + includedIndices[includedCount++] = i; } } + + // Optimizing for readField? + if (includedCount < columnCount && deserializeRead.isReadFieldSupported()) { + useReadField = true; + readFieldLogicalIndices = Arrays.copyOf(includedIndices, includedCount); + } + } /** @@ -258,37 +278,33 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { * DeserializedRead interface passed to the constructor to the target data types desired in * the VectorizedRowBatch. * - * No projection -- the column range 0 .. count-1 - * - * where count is the minimum of the target data type array size, included array size, - * and source data type array size. + * No projection -- using the column range 0 .. columnCount-1 * * @param targetTypeInfos * @param columnsToIncludeTruncated - * @return the minimum count described above is returned. That is, the number of columns - * that will be processed by deserialize. * @throws HiveException */ - public int initConversion(TypeInfo[] targetTypeInfos, + public void initConversion(TypeInfo[] targetTypeInfos, boolean[] columnsToIncludeTruncated) throws HiveException { - if (columnsToIncludeTruncated != null) { - deserializeRead.setColumnsToInclude(columnsToIncludeTruncated); - } + // We assume the caller will handle extra columns default with nulls, etc. + Preconditions.checkState(targetTypeInfos.length >= sourceTypeInfos.length); - int targetColumnCount; - if (columnsToIncludeTruncated == null) { - targetColumnCount = targetTypeInfos.length; - } else { - targetColumnCount = Math.min(targetTypeInfos.length, columnsToIncludeTruncated.length); - } + // When truncated included is used, its length must be at least the number of source type infos. + // When longer, we assume the caller will default with nulls, etc. + Preconditions.checkState( + columnsToIncludeTruncated == null || + columnsToIncludeTruncated.length >= sourceTypeInfos.length); - int sourceColumnCount = Math.min(sourceTypeInfos.length, targetColumnCount); - allocateArrays(sourceColumnCount); - allocateConvertArrays(sourceColumnCount); + final int columnCount = sourceTypeInfos.length; + allocateArrays(columnCount); + allocateConvertArrays(columnCount); + + int includedCount = 0; + int[] includedIndices = new int[columnCount]; boolean atLeastOneConvert = false; - for (int i = 0; i < sourceColumnCount; i++) { + for (int i = 0; i < columnCount; i++) { if (columnsToIncludeTruncated != null && !columnsToIncludeTruncated[i]) { @@ -320,9 +336,17 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { initSourceEntry(i, i, sourceTypeInfo); } + + includedIndices[includedCount++] = i; } } + // Optimizing for readField? + if (includedCount < columnCount && deserializeRead.isReadFieldSupported()) { + useReadField = true; + readFieldLogicalIndices = Arrays.copyOf(includedIndices, includedCount); + } + if (atLeastOneConvert) { // Let the VectorAssignRow class do the conversion. @@ -330,8 +354,6 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { convertVectorAssignRow.initConversion(sourceTypeInfos, targetTypeInfos, columnsToIncludeTruncated); } - - return sourceColumnCount; } public void init() throws HiveException { @@ -339,7 +361,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { } /** - * Deserialize one row column value. + * Store one row column value that is the current value in deserializeRead. * * @param batch * @param batchIndex @@ -351,27 +373,11 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { * in a hash table entry that is immutable. * @throws IOException */ - private void deserializeRowColumn(VectorizedRowBatch batch, int batchIndex, + private void storeRowColumn(VectorizedRowBatch batch, int batchIndex, int logicalColumnIndex, boolean canRetainByteRef) throws IOException { - Category sourceCategory = sourceCategories[logicalColumnIndex]; - if (sourceCategory == null) { - /* - * This is a column that we don't want (i.e. not included). - * The deserializeRead.readCheckNull() will read the field. - */ - boolean isNull = deserializeRead.readCheckNull(); - Preconditions.checkState(isNull); - return; - } final int projectionColumnNum = projectionColumnNums[logicalColumnIndex]; - if (deserializeRead.readCheckNull()) { - VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex); - return; - } - - // We have a value for the row column. - switch (sourceCategory) { + switch (sourceCategories[logicalColumnIndex]) { case PRIMITIVE: { PrimitiveCategory sourcePrimitiveCategory = sourcePrimitiveCategories[logicalColumnIndex]; @@ -546,7 +552,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { } break; default: - throw new RuntimeException("Category " + sourceCategory.name() + " not supported"); + throw new RuntimeException("Category " + sourceCategories[logicalColumnIndex] + " not supported"); } // We always set the null flag to false when there is a value. @@ -554,7 +560,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { } /** - * Deserialize and convert one row column value. + * Convert one row column value that is the current value in deserializeRead. * * We deserialize into a writable and then pass that writable to an instance of VectorAssignRow * to convert the writable to the target data type and assign it into the VectorizedRowBatch. @@ -564,32 +570,14 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { * @param logicalColumnIndex * @throws IOException */ - private void deserializeConvertRowColumn(VectorizedRowBatch batch, int batchIndex, + private void convertRowColumn(VectorizedRowBatch batch, int batchIndex, int logicalColumnIndex) throws IOException { - Category sourceCategory = sourceCategories[logicalColumnIndex]; - if (sourceCategory == null) { - /* - * This is a column that we don't want (i.e. not included). - * The deserializeRead.readCheckNull() will read the field. - */ - boolean isNull = deserializeRead.readCheckNull(); - Preconditions.checkState(isNull); - return; - } - final int projectionColumnNum = projectionColumnNums[logicalColumnIndex]; - if (deserializeRead.readCheckNull()) { - VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex); - return; - } - - // We have a value for the row column. Writable convertSourceWritable = convertSourceWritables[logicalColumnIndex]; - switch (sourceCategory) { + switch (sourceCategories[logicalColumnIndex]) { case PRIMITIVE: { - PrimitiveCategory sourcePrimitiveCategory = sourcePrimitiveCategories[logicalColumnIndex]; - switch (sourcePrimitiveCategory) { + switch (sourcePrimitiveCategories[logicalColumnIndex]) { case VOID: convertSourceWritable = null; break; @@ -702,13 +690,13 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { deserializeRead.currentHiveIntervalDayTimeWritable); break; default: - throw new RuntimeException("Primitive category " + sourcePrimitiveCategory.name() + + throw new RuntimeException("Primitive category " + sourcePrimitiveCategories[logicalColumnIndex] + " not supported"); } } break; default: - throw new RuntimeException("Category " + sourceCategory.name() + " not supported"); + throw new RuntimeException("Category " + sourceCategories[logicalColumnIndex] + " not supported"); } /* @@ -746,17 +734,51 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { * @throws IOException */ public void deserialize(VectorizedRowBatch batch, int batchIndex) throws IOException { + + // Pass false for canRetainByteRef since we will NOT be keeping byte references to the input + // bytes with the BytesColumnVector.setRef method. + final int count = isConvert.length; - for (int i = 0; i < count; i++) { - if (isConvert[i]) { - deserializeConvertRowColumn(batch, batchIndex, i); - } else { - // Pass false for canRetainByteRef since we will NOT be keeping byte references to the input - // bytes with the BytesColumnVector.setRef method. - deserializeRowColumn(batch, batchIndex, i, /* canRetainByteRef */ false); + if (!useReadField) { + for (int i = 0; i < count; i++) { + final int projectionColumnNum = projectionColumnNums[i]; + if (projectionColumnNum == -1) { + // We must read through fields we do not want. + deserializeRead.skipNextField(); + continue; + } + if (!deserializeRead.readNextField()) { + ColumnVector colVector = batch.cols[projectionColumnNum]; + colVector.isNull[batchIndex] = true; + colVector.noNulls = false; + continue; + } + // The current* members of deserializeRead have the field value. + if (isConvert[i]) { + convertRowColumn(batch, batchIndex, i); + } else { + storeRowColumn(batch, batchIndex, i, /* canRetainByteRef */ false); + } + } + } else { + final int readFieldCount = readFieldLogicalIndices.length; + for (int i = 0; i < readFieldCount; i++) { + final int logicalIndex = readFieldLogicalIndices[i]; + // Jump to the field we want and read it. + if (!deserializeRead.readField(logicalIndex)) { + ColumnVector colVector = batch.cols[projectionColumnNums[logicalIndex]]; + colVector.isNull[batchIndex] = true; + colVector.noNulls = false; + continue; + } + // The current* members of deserializeRead have the field value. + if (isConvert[logicalIndex]) { + convertRowColumn(batch, batchIndex, logicalIndex); + } else { + storeRowColumn(batch, batchIndex, logicalIndex, /* canRetainByteRef */ false); + } } } - deserializeRead.extraFieldsCheck(); } /** @@ -781,16 +803,46 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { */ public void deserializeByRef(VectorizedRowBatch batch, int batchIndex) throws IOException { final int count = isConvert.length; - for (int i = 0; i < count; i++) { - if (isConvert[i]) { - deserializeConvertRowColumn(batch, batchIndex, i); - } else { - // Pass true for canRetainByteRef since we will be keeping byte references to the input - // bytes with the BytesColumnVector.setRef method. - deserializeRowColumn(batch, batchIndex, i, /* canRetainByteRef */ true); + if (!useReadField) { + for (int i = 0; i < count; i++) { + final int projectionColumnNum = projectionColumnNums[i]; + if (projectionColumnNum == -1) { + // We must read through fields we do not want. + deserializeRead.skipNextField(); + continue; + } + if (!deserializeRead.readNextField()) { + ColumnVector colVector = batch.cols[projectionColumnNum]; + colVector.isNull[batchIndex] = true; + colVector.noNulls = false; + continue; + } + // The current* members of deserializeRead have the field value. + if (isConvert[i]) { + convertRowColumn(batch, batchIndex, i); + } else { + storeRowColumn(batch, batchIndex, i, /* canRetainByteRef */ true); + } + } + } else { + final int readFieldCount = readFieldLogicalIndices.length; + for (int i = 0; i < readFieldCount; i++) { + final int logicalIndex = readFieldLogicalIndices[i]; + // Jump to the field we want and read it. + if (!deserializeRead.readField(logicalIndex)) { + ColumnVector colVector = batch.cols[projectionColumnNums[logicalIndex]]; + colVector.isNull[batchIndex] = true; + colVector.noNulls = false; + continue; + } + // The current* members of deserializeRead have the field value. + if (isConvert[logicalIndex]) { + convertRowColumn(batch, batchIndex, logicalIndex); + } else { + storeRowColumn(batch, batchIndex, logicalIndex, /* canRetainByteRef */ true); + } } } - deserializeRead.extraFieldsCheck(); } http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java index c7fa0db..323419c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java @@ -253,6 +253,20 @@ public class VectorMapOperator extends AbstractMapOperator { // This type information specifies the data types the partition needs to read. TypeInfo[] dataTypeInfos = vectorPartDesc.getDataTypeInfos(); + // We need to provide the minimum number of columns to be read so + // LazySimpleDeserializeRead's separator parser does not waste time. + // + Preconditions.checkState(dataColumnsToIncludeTruncated != null); + TypeInfo[] minimalDataTypeInfos; + if (dataColumnsToIncludeTruncated.length < dataTypeInfos.length) { + minimalDataTypeInfos = + Arrays.copyOf(dataTypeInfos, dataColumnsToIncludeTruncated.length); + } else { + minimalDataTypeInfos = dataTypeInfos; + } + + readerColumnCount = minimalDataTypeInfos.length; + switch (vectorPartDesc.getVectorDeserializeType()) { case LAZY_SIMPLE: { @@ -262,7 +276,7 @@ public class VectorMapOperator extends AbstractMapOperator { LazySimpleDeserializeRead lazySimpleDeserializeRead = new LazySimpleDeserializeRead( - dataTypeInfos, + minimalDataTypeInfos, /* useExternalBuffer */ true, simpleSerdeParams); @@ -270,8 +284,7 @@ public class VectorMapOperator extends AbstractMapOperator { new VectorDeserializeRow<LazySimpleDeserializeRead>(lazySimpleDeserializeRead); // Initialize with data row type conversion parameters. - readerColumnCount = - vectorDeserializeRow.initConversion(tableRowTypeInfos, dataColumnsToIncludeTruncated); + vectorDeserializeRow.initConversion(tableRowTypeInfos, dataColumnsToIncludeTruncated); deserializeRead = lazySimpleDeserializeRead; } @@ -288,8 +301,7 @@ public class VectorMapOperator extends AbstractMapOperator { new VectorDeserializeRow<LazyBinaryDeserializeRead>(lazyBinaryDeserializeRead); // Initialize with data row type conversion parameters. - readerColumnCount = - vectorDeserializeRow.initConversion(tableRowTypeInfos, dataColumnsToIncludeTruncated); + vectorDeserializeRow.initConversion(tableRowTypeInfos, dataColumnsToIncludeTruncated); deserializeRead = lazyBinaryDeserializeRead; } http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java index 726a937..bc892ba 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java @@ -79,7 +79,7 @@ public abstract class VectorMapJoinFastLongHashTable int keyLength = currentKey.getLength(); keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength); try { - if (keyBinarySortableDeserializeRead.readCheckNull()) { + if (!keyBinarySortableDeserializeRead.readNextField()) { return; } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java index 456e6ba..ab39e58 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java @@ -46,7 +46,7 @@ public class VectorMapJoinFastStringCommon { int keyLength = currentKey.getLength(); keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength); try { - if (keyBinarySortableDeserializeRead.readCheckNull()) { + if (!keyBinarySortableDeserializeRead.readNextField()) { return; } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedLongCommon.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedLongCommon.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedLongCommon.java index ac85899..6a9039f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedLongCommon.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedLongCommon.java @@ -65,62 +65,6 @@ public class VectorMapJoinOptimizedLongCommon { return max; } - /* - * For now, just use MapJoinBytesTableContainer / HybridHashTableContainer directly. - - public void adaptPutRow(VectorMapJoinOptimizedHashTable hashTable, - BytesWritable currentKey, BytesWritable currentValue) - throws SerDeException, HiveException, IOException { - - if (useMinMax) { - // Peek at the BinarySortable key to extract the long so we can determine min and max. - byte[] keyBytes = currentKey.getBytes(); - int keyLength = currentKey.getLength(); - keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength); - if (keyBinarySortableDeserializeRead.readCheckNull()) { - if (isOuterJoin) { - return; - } else { - // For inner join, we expect all NULL values to have been filtered out before now. - throw new HiveException("Unexpected NULL"); - } - } - long key = 0; - switch (hashTableKeyType) { - case BOOLEAN: - key = (keyBinarySortableDeserializeRead.readBoolean() ? 1 : 0); - break; - case BYTE: - key = (long) keyBinarySortableDeserializeRead.readByte(); - break; - case SHORT: - key = (long) keyBinarySortableDeserializeRead.readShort(); - break; - case INT: - key = (long) keyBinarySortableDeserializeRead.readInt(); - break; - case LONG: - key = keyBinarySortableDeserializeRead.readLong(); - break; - default: - throw new RuntimeException("Unexpected hash table key type " + hashTableKeyType.name()); - } - if (key < min) { - min = key; - } - if (key > max) { - max = key; - } - - // byte[] bytes = Arrays.copyOf(currentKey.get(), currentKey.getLength()); - // LOG.debug("VectorMapJoinOptimizedLongCommon adaptPutRow key " + key + " min " + min + " max " + max + " hashTableKeyType " + hashTableKeyType.name() + " hex " + Hex.encodeHexString(bytes)); - - } - - hashTable.putRowInternal(currentKey, currentValue); - } - */ - public SerializedBytes serialize(long key) throws IOException { keyBinarySortableSerializeWrite.reset(); http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringCommon.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringCommon.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringCommon.java index 39c2d49..072919b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringCommon.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringCommon.java @@ -45,32 +45,6 @@ public class VectorMapJoinOptimizedStringCommon { private transient SerializedBytes serializedBytes; - /* - private BytesWritable bytesWritable; - - public void adaptPutRow(VectorMapJoinOptimizedHashTable hashTable, - BytesWritable currentKey, BytesWritable currentValue) - throws SerDeException, HiveException, IOException { - - byte[] keyBytes = currentKey.getBytes(); - int keyLength = currentKey.getLength(); - keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength); - if (keyBinarySortableDeserializeRead.readCheckNull()) { - if (isOuterJoin) { - return; - } else { - // For inner join, we expect all NULL values to have been filtered out before now. - throw new HiveException("Unexpected NULL"); - } - } - keyBinarySortableDeserializeRead.readString(readStringResults); - - bytesWritable.set(readStringResults.bytes, readStringResults.start, readStringResults.length); - - hashTable.putRowInternal(bytesWritable, currentValue); - } - */ - public SerializedBytes serialize(byte[] keyBytes, int keyStart, int keyLength) throws IOException { keyBinarySortableSerializeWrite.reset(); http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index b760988..46bdba6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -29,6 +29,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Properties; import java.util.Set; import java.util.Stack; import java.util.regex.Pattern; @@ -648,11 +649,27 @@ public class Vectorizer implements PhysicalPlanResolver { if (inputFileFormatClassName.equals(TextInputFormat.class.getName()) && deserializerClassName.equals(LazySimpleSerDe.class.getName())) { - pd.setVectorPartitionDesc( - VectorPartitionDesc.createVectorDeserialize( - inputFileFormatClassName, VectorDeserializeType.LAZY_SIMPLE)); + Properties properties = pd.getTableDesc().getProperties(); + String lastColumnTakesRestString = + properties.getProperty(serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST); + boolean lastColumnTakesRest = + (lastColumnTakesRestString != null && + lastColumnTakesRestString.equalsIgnoreCase("true")); + if (lastColumnTakesRest) { + + // If row mode will not catch this, then inform. + if (useRowDeserialize) { + LOG.info("Input format: " + inputFileFormatClassName + " cannot be vectorized" + + " when " + serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST + "is true"); + return false; + } + } else { + pd.setVectorPartitionDesc( + VectorPartitionDesc.createVectorDeserialize( + inputFileFormatClassName, VectorDeserializeType.LAZY_SIMPLE)); - return true; + return true; + } } else if (inputFileFormatClassName.equals(SequenceFileInputFormat.class.getName()) && deserializerClassName.equals(LazyBinarySerDe.class.getName())) { http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java index 238c136..8ffff9d 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java @@ -96,7 +96,7 @@ public class TestVectorSerDeRow extends TestCase { Object expected = expectedRow[i]; PrimitiveCategory primitiveCategory = primitiveCategories[i]; PrimitiveTypeInfo primitiveTypeInfo = source.primitiveTypeInfos()[i]; - if (deserializeRead.readCheckNull()) { + if (!deserializeRead.readNextField()) { throw new HiveException("Unexpected NULL"); } switch (primitiveCategory) { @@ -282,9 +282,7 @@ public class TestVectorSerDeRow extends TestCase { throw new HiveException("Unexpected primitive category " + primitiveCategory); } } - deserializeRead.extraFieldsCheck(); - TestCase.assertTrue(!deserializeRead.readBeyondConfiguredFieldsWarned()); - TestCase.assertTrue(!deserializeRead.bufferRangeHasExtraDataWarned()); + TestCase.assertTrue(deserializeRead.isEndOfInputReached()); } void serializeBatch(VectorizedRowBatch batch, VectorSerializeRow vectorSerializeRow, @@ -382,11 +380,13 @@ public class TestVectorSerDeRow extends TestCase { Object[] expectedRow = randomRows[firstRandomRowIndex + i]; for (int c = 0; c < rowSize; c++) { - if (row[c] == null) { + Object rowObj = row[c]; + Object expectedObj = expectedRow[c]; + if (rowObj == null) { fail("Unexpected NULL from extractRow"); } - if (!row[c].equals(expectedRow[c])) { - fail("Row " + (firstRandomRowIndex + i) + " and column " + c + " mismatch (" + primitiveTypeInfos[c].getPrimitiveCategory() + " actual value " + row[c] + " and expected value " + expectedRow[c] + ")"); + if (!rowObj.equals(expectedObj)) { + fail("Row " + (firstRandomRowIndex + i) + " and column " + c + " mismatch (" + primitiveTypeInfos[c].getPrimitiveCategory() + " actual value " + rowObj + " and expected value " + expectedObj + ")"); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/CheckFastRowHashMap.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/CheckFastRowHashMap.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/CheckFastRowHashMap.java index 7f68186..bc7a658 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/CheckFastRowHashMap.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/CheckFastRowHashMap.java @@ -79,10 +79,7 @@ public class CheckFastRowHashMap extends CheckFastHashTable { Writable writable = (Writable) row[index]; VerifyFastRow.verifyDeserializeRead(lazyBinaryDeserializeRead, (PrimitiveTypeInfo) typeInfos[index], writable); } - lazyBinaryDeserializeRead.extraFieldsCheck(); - TestCase.assertTrue(!lazyBinaryDeserializeRead.readBeyondConfiguredFieldsWarned()); - - TestCase.assertTrue(!lazyBinaryDeserializeRead.bufferRangeHasExtraDataWarned()); + TestCase.assertTrue(lazyBinaryDeserializeRead.isEndOfInputReached()); ref = hashMapResult.next(); if (a == count - 1) { @@ -171,10 +168,7 @@ public class CheckFastRowHashMap extends CheckFastHashTable { if (thrown) { TestCase.fail("Not expecting an exception to be thrown for the non-clipped case..."); } - lazyBinaryDeserializeRead.extraFieldsCheck(); - TestCase.assertTrue(!lazyBinaryDeserializeRead.readBeyondConfiguredFieldsWarned()); - - TestCase.assertTrue(!lazyBinaryDeserializeRead.bufferRangeHasExtraDataWarned()); + TestCase.assertTrue(lazyBinaryDeserializeRead.isEndOfInputReached()); } ref = hashMapResult.next(); http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VerifyFastRow.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VerifyFastRow.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VerifyFastRow.java index 118e9e2..239db73 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VerifyFastRow.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VerifyFastRow.java @@ -64,7 +64,7 @@ public class VerifyFastRow { boolean isNull; - isNull = deserializeRead.readCheckNull(); + isNull = !deserializeRead.readNextField(); if (isNull) { if (writable != null) { TestCase.fail("Field reports null but object is not null"); http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java b/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java index 0cbc8d0..a7785b2 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java @@ -37,7 +37,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; * Directly deserialize with the caller reading field-by-field the LazyBinary serialization format. * * The caller is responsible for calling the read method for the right type of each field - * (after calling readCheckNull). + * (after calling readNextField). * * Reading some fields require a results object to receive value information. A separate * results object is created by the caller at initialization per different field even for the same @@ -53,7 +53,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead { // The sort order (ascending/descending) for each field. Set to true when descending (invert). private boolean[] columnSortOrderIsDesc; - // Which field we are on. We start with -1 so readCheckNull can increment once and the read + // Which field we are on. We start with -1 so readNextField can increment once and the read // field data methods don't increment. private int fieldIndex; @@ -72,9 +72,6 @@ public final class BinarySortableDeserializeRead extends DeserializeRead { private byte[] tempDecimalBuffer; - private boolean readBeyondConfiguredFieldsWarned; - private boolean bufferRangeHasExtraDataWarned; - private InputByteBuffer inputByteBuffer = new InputByteBuffer(); /* @@ -96,8 +93,6 @@ public final class BinarySortableDeserializeRead extends DeserializeRead { Arrays.fill(this.columnSortOrderIsDesc, false); } inputByteBuffer = new InputByteBuffer(); - readBeyondConfiguredFieldsWarned = false; - bufferRangeHasExtraDataWarned = false; internalBufferLen = -1; } @@ -151,28 +146,28 @@ public final class BinarySortableDeserializeRead extends DeserializeRead { } /* - * Reads the NULL information for a field. + * Reads the the next field. + * + * Afterwards, reading is positioned to the next field. + * + * @return Return true when the field was not null and data is put in the appropriate + * current* member. + * Otherwise, false when the field is null. * - * @return Returns true when the field is NULL; reading is positioned to the next field. - * Otherwise, false when the field is NOT NULL; reading is positioned to the field data. */ @Override - public boolean readCheckNull() throws IOException { + public boolean readNextField() throws IOException { // We start with fieldIndex as -1 so we can increment once here and then the read // field data methods don't increment. fieldIndex++; if (fieldIndex >= fieldCount) { - // Reading beyond the specified field count produces NULL. - if (!readBeyondConfiguredFieldsWarned) { - doReadBeyondConfiguredFieldsWarned(); - } - return true; + return false; } if (inputByteBuffer.isEof()) { // Also, reading beyond our byte range produces NULL. - return true; + return false; } fieldStart = inputByteBuffer.tell(); @@ -180,20 +175,19 @@ public final class BinarySortableDeserializeRead extends DeserializeRead { byte isNullByte = inputByteBuffer.read(columnSortOrderIsDesc[fieldIndex]); if (isNullByte == 0) { - return true; + return false; } /* * We have a field and are positioned to it. Read it. */ - boolean isNull = false; // Assume. switch (primitiveCategories[fieldIndex]) { case BOOLEAN: currentBoolean = (inputByteBuffer.read(columnSortOrderIsDesc[fieldIndex]) == 2); - break; + return true; case BYTE: currentByte = (byte) (inputByteBuffer.read(columnSortOrderIsDesc[fieldIndex]) ^ 0x80); - break; + return true; case SHORT: { final boolean invert = columnSortOrderIsDesc[fieldIndex]; @@ -201,7 +195,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead { v = (v << 8) + (inputByteBuffer.read(invert) & 0xff); currentShort = (short) v; } - break; + return true; case INT: { final boolean invert = columnSortOrderIsDesc[fieldIndex]; @@ -211,7 +205,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead { } currentInt = v; } - break; + return true; case LONG: { final boolean invert = columnSortOrderIsDesc[fieldIndex]; @@ -221,7 +215,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead { } currentLong = v; } - break; + return true; case DATE: { final boolean invert = columnSortOrderIsDesc[fieldIndex]; @@ -231,7 +225,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead { } currentDateWritable.set(v); } - break; + return true; case TIMESTAMP: { if (tempTimestampBytes == null) { @@ -243,7 +237,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead { } currentTimestampWritable.setBinarySortable(tempTimestampBytes, 0); } - break; + return true; case FLOAT: { final boolean invert = columnSortOrderIsDesc[fieldIndex]; @@ -260,7 +254,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead { } currentFloat = Float.intBitsToFloat(v); } - break; + return true; case DOUBLE: { final boolean invert = columnSortOrderIsDesc[fieldIndex]; @@ -277,7 +271,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead { } currentDouble = Double.longBitsToDouble(v); } - break; + return true; case BINARY: case STRING: case CHAR: @@ -333,7 +327,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead { } } } - break; + return true; case INTERVAL_YEAR_MONTH: { final boolean invert = columnSortOrderIsDesc[fieldIndex]; @@ -343,7 +337,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead { } currentHiveIntervalYearMonthWritable.set(v); } - break; + return true; case INTERVAL_DAY_TIME: { final boolean invert = columnSortOrderIsDesc[fieldIndex]; @@ -357,7 +351,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead { } currentHiveIntervalDayTimeWritable.set(totalSecs, nanos); } - break; + return true; case DECIMAL: { // Since enforcing precision and scale can cause a HiveDecimal to become NULL, @@ -428,25 +422,26 @@ public final class BinarySortableDeserializeRead extends DeserializeRead { HiveDecimal decimal = currentHiveDecimalWritable.getHiveDecimal(precision, scale); if (decimal == null) { - isNull = true; - } else { - // Put value back into writable. - currentHiveDecimalWritable.set(decimal); + return false; } + // Put value back into writable. + currentHiveDecimalWritable.set(decimal); } - break; + return true; default: throw new RuntimeException("Unexpected primitive type category " + primitiveCategories[fieldIndex]); } + } - /* - * Now that we have read through the field -- did we really want it? - */ - if (columnsToInclude != null && !columnsToInclude[fieldIndex]) { - isNull = true; - } - - return isNull; + /* + * Reads through an undesired field. + * + * No data values are valid after this call. + * Designed for skipping columns that are not included. + */ + public void skipNextField() throws IOException { + // Not a known use case for BinarySortable -- so don't optimize. + readNextField(); } @Override @@ -476,44 +471,17 @@ public final class BinarySortableDeserializeRead extends DeserializeRead { } /* - * Call this method after all fields have been read to check for extra fields. - */ - public void extraFieldsCheck() { - if (!inputByteBuffer.isEof()) { - // We did not consume all of the byte range. - if (!bufferRangeHasExtraDataWarned) { - // Warn only once. - int length = inputByteBuffer.getEnd() - start; - int remaining = inputByteBuffer.getEnd() - inputByteBuffer.tell(); - LOG.info("Not all fields were read in the buffer range! Buffer range " + start - + " for length " + length + " but " + remaining + " bytes remain. " - + "(total buffer length " + inputByteBuffer.getData().length + ")" - + " Ignoring similar problems."); - bufferRangeHasExtraDataWarned = true; - } - } - } - - /* - * Read integrity warning flags. - */ - @Override - public boolean readBeyondConfiguredFieldsWarned() { - return readBeyondConfiguredFieldsWarned; - } - @Override - public boolean bufferRangeHasExtraDataWarned() { - return bufferRangeHasExtraDataWarned; - } - - /* - * Pull these out of the regular execution path. + * Call this method may be called after all the all fields have been read to check + * for unread fields. + * + * Note that when optimizing reading to stop reading unneeded include columns, worrying + * about whether all data is consumed is not appropriate (often we aren't reading it all by + * design). + * + * Since LazySimpleDeserializeRead parses the line through the last desired column it does + * support this function. */ - - private void doReadBeyondConfiguredFieldsWarned() { - // Warn only once. - LOG.info("Reading beyond configured fields! Configured " + fieldCount + " fields but " - + " reading more (NULLs returned). Ignoring similar problems."); - readBeyondConfiguredFieldsWarned = true; + public boolean isEndOfInputReached() { + return inputByteBuffer.isEof(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java b/serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java index 1600fec..ac931d6 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java @@ -33,7 +33,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; * Directly deserialize with the caller reading field-by-field a serialization format. * * The caller is responsible for calling the read method for the right type of each field - * (after calling readCheckNull). + * (after calling readNextField). * * Reading some fields require a results object to receive value information. A separate * results object is created by the caller at initialization per different field even for the same @@ -49,18 +49,16 @@ public abstract class DeserializeRead { protected boolean useExternalBuffer; - protected boolean[] columnsToInclude; - protected Category[] categories; protected PrimitiveCategory[] primitiveCategories; /** * Constructor. * - * When useExternalBuffer is specified true and readCheckNull reads a string/char/varchar/binary + * When useExternalBuffer is specified true and readNextField reads a string/char/varchar/binary * field, it will request an external buffer to receive the data of format conversion. * - * if (!deserializeRead.readCheckNull()) { + * if (deserializeRead.readNextField()) { * if (deserializeRead.currentExternalBufferNeeded) { * <Ensure external buffer is as least deserializeRead.currentExternalBufferNeededLen bytes> * deserializeRead.copyToExternalBuffer(externalBuffer, externalBufferStart); @@ -121,8 +119,6 @@ public abstract class DeserializeRead { this.useExternalBuffer = useExternalBuffer; } - - columnsToInclude = null; } // Don't allow for public. @@ -137,37 +133,62 @@ public abstract class DeserializeRead { } /* - * If some fields are are not going to be used by the query, use this routine to specify - * the columns to return. The readCheckNull method will automatically return NULL for the - * other columns. + * Set the range of bytes to be deserialized. */ - public void setColumnsToInclude(boolean[] columnsToInclude) { - this.columnsToInclude = columnsToInclude; - } + public abstract void set(byte[] bytes, int offset, int length); /* - * Set the range of bytes to be deserialized. + * Reads the the next field. + * + * Afterwards, reading is positioned to the next field. + * + * @return Return true when the field was not null and data is put in the appropriate + * current* member. + * Otherwise, false when the field is null. + * */ - public abstract void set(byte[] bytes, int offset, int length); + public abstract boolean readNextField() throws IOException; /* - * Reads the NULL information for a field. + * Reads through an undesired field. * - * @return Return true when the field is NULL; reading is positioned to the next field. - * Otherwise, false when the field is NOT NULL; reading is positioned to the field data. + * No data values are valid after this call. + * Designed for skipping columns that are not included. */ - public abstract boolean readCheckNull() throws IOException; + public abstract void skipNextField() throws IOException; /* - * Call this method after all fields have been read to check for extra fields. + * Returns true if the readField method is supported; */ - public abstract void extraFieldsCheck(); + public boolean isReadFieldSupported() { + return false; + } /* - * Read integrity warning flags. + * When supported, read a field by field number (i.e. random access). + * + * Currently, only LazySimpleDeserializeRead supports this. + * + * @return Return true when the field was not null and data is put in the appropriate + * current* member. + * Otherwise, false when the field is null. + */ + public boolean readField(int fieldIndex) throws IOException { + throw new RuntimeException("Not supported"); + } + + /* + * Call this method may be called after all the all fields have been read to check + * for unread fields. + * + * Note that when optimizing reading to stop reading unneeded include columns, worrying + * about whether all data is consumed is not appropriate (often we aren't reading it all by + * design). + * + * Since LazySimpleDeserializeRead parses the line through the last desired column it does + * support this function. */ - public abstract boolean readBeyondConfiguredFieldsWarned(); - public abstract boolean bufferRangeHasExtraDataWarned(); + public abstract boolean isEndOfInputReached(); /* * Get detailed read position information to help diagnose exceptions. @@ -175,7 +196,7 @@ public abstract class DeserializeRead { public abstract String getDetailedReadPositionString(); /* - * These members hold the current value that was read when readCheckNull return false. + * These members hold the current value that was read when readNextField return false. */ /* http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java b/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java index 07709d8..daf2cfb 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.serde2.lazy.fast; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.charset.CharacterCodingException; +import java.nio.charset.StandardCharsets; import java.sql.Date; import java.util.Arrays; @@ -37,6 +38,7 @@ import org.apache.hadoop.hive.serde2.lazy.LazyLong; import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters; import org.apache.hadoop.hive.serde2.lazy.LazyShort; import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.Text; @@ -47,7 +49,7 @@ import org.apache.hive.common.util.TimestampParser; * serialization format. * * The caller is responsible for calling the read method for the right type of each field - * (after calling readCheckNull). + * (after calling readNextField). * * Reading some fields require a results object to receive value information. A separate * results object is created by the caller at initialization per different field even for the same @@ -62,49 +64,63 @@ public final class LazySimpleDeserializeRead extends DeserializeRead { private int[] startPosition; - private byte separator; - private boolean isEscaped; - private byte escapeChar; - private byte[] nullSequenceBytes; - private boolean isExtendedBooleanLiteral; - private boolean lastColumnTakesRest; + private final byte separator; + private final boolean isEscaped; + private final byte escapeChar; + private final int[] escapeCounts; + private final byte[] nullSequenceBytes; + private final boolean isExtendedBooleanLiteral; + + private final int fieldCount; private byte[] bytes; private int start; - private int offset; private int end; - private int fieldCount; - private int fieldIndex; - private int parseFieldIndex; - private int fieldStart; - private int fieldLength; + private boolean parsed; + + // Used by readNextField/skipNextField and not by readField. + private int nextFieldIndex; + // For getDetailedReadPositionString. + private int currentFieldIndex; + private int currentFieldStart; + private int currentFieldLength; + + // For string/char/varchar buffering when there are escapes. private int internalBufferLen; private byte[] internalBuffer; - private TimestampParser timestampParser; + private final TimestampParser timestampParser; - private boolean extraFieldWarned; - private boolean missingFieldWarned; + private boolean isEndOfInputReached; public LazySimpleDeserializeRead(TypeInfo[] typeInfos, boolean useExternalBuffer, byte separator, LazySerDeParameters lazyParams) { super(typeInfos, useExternalBuffer); + fieldCount = typeInfos.length; + // Field length is difference between positions hence one extra. - startPosition = new int[typeInfos.length + 1]; + startPosition = new int[fieldCount + 1]; this.separator = separator; isEscaped = lazyParams.isEscaped(); - escapeChar = lazyParams.getEscapeChar(); + if (isEscaped) { + escapeChar = lazyParams.getEscapeChar(); + escapeCounts = new int[fieldCount]; + } else { + escapeChar = (byte) 0; + escapeCounts = null; + } nullSequenceBytes = lazyParams.getNullSequence().getBytes(); isExtendedBooleanLiteral = lazyParams.isExtendedBooleanLiteral(); - lastColumnTakesRest = lazyParams.isLastColumnTakesRest(); + if (lazyParams.isLastColumnTakesRest()) { + throw new RuntimeException("serialization.last.column.takes.rest not supported"); + } + + timestampParser = new TimestampParser(); - fieldCount = typeInfos.length; - extraFieldWarned = false; - missingFieldWarned = false; internalBufferLen = -1; } @@ -113,21 +129,16 @@ public final class LazySimpleDeserializeRead extends DeserializeRead { this(typeInfos, useExternalBuffer, lazyParams.getSeparators()[0], lazyParams); } - // Not public since we must have the field count so every 8 fields NULL bytes can be navigated. - private LazySimpleDeserializeRead() { - super(); - } - /* * Set the range of bytes to be deserialized. */ @Override public void set(byte[] bytes, int offset, int length) { this.bytes = bytes; - this.offset = offset; start = offset; end = offset + length; - fieldIndex = -1; + parsed = false; + nextFieldIndex = -1; } /* @@ -147,19 +158,16 @@ public final class LazySimpleDeserializeRead extends DeserializeRead { sb.append(" fields with types "); sb.append(Arrays.toString(typeInfos)); sb.append(". "); - if (fieldIndex == -1) { - sb.append("Error during field delimitor parsing of field #"); - sb.append(parseFieldIndex); + if (!parsed) { + sb.append("Error during field separator parsing"); } else { sb.append("Read field #"); - sb.append(fieldIndex); + sb.append(currentFieldIndex); sb.append(" at field start position "); - sb.append(startPosition[fieldIndex]); - int currentFieldLength = startPosition[fieldIndex + 1] - startPosition[fieldIndex] - 1; + sb.append(startPosition[currentFieldIndex]); + int currentFieldLength = startPosition[currentFieldIndex + 1] - startPosition[currentFieldIndex] - 1; sb.append(" for field length "); sb.append(currentFieldLength); - sb.append(" current read offset "); - sb.append(offset); } return sb.toString(); @@ -173,395 +181,406 @@ public final class LazySimpleDeserializeRead extends DeserializeRead { */ private void parse() { - int structByteEnd = end; + int fieldId = 0; int fieldByteBegin = start; int fieldByteEnd = start; - // Kept as a member variable to support getDetailedReadPositionString. - parseFieldIndex = 0; + final byte separator = this.separator; + final int fieldCount = this.fieldCount; + final int[] startPosition = this.startPosition; + final byte[] bytes = this.bytes; + final int end = this.end; - // Go through all bytes in the byte[] - while (fieldByteEnd <= structByteEnd) { - if (fieldByteEnd == structByteEnd || bytes[fieldByteEnd] == separator) { - // Reached the end of a field? - if (lastColumnTakesRest && parseFieldIndex == fieldCount - 1) { - fieldByteEnd = structByteEnd; - } - startPosition[parseFieldIndex] = fieldByteBegin; - parseFieldIndex++; - if (parseFieldIndex == fieldCount || fieldByteEnd == structByteEnd) { - // All fields have been parsed, or bytes have been parsed. - // We need to set the startPosition of fields.length to ensure we - // can use the same formula to calculate the length of each field. - // For missing fields, their starting positions will all be the same, - // which will make their lengths to be -1 and uncheckedGetField will - // return these fields as NULLs. - for (int i = parseFieldIndex; i <= fieldCount; i++) { - startPosition[i] = fieldByteEnd + 1; + /* + * Optimize the loops by pulling special end cases and global decisions like isEscaped out! + */ + if (!isEscaped) { + while (fieldByteEnd < end) { + if (bytes[fieldByteEnd] == separator) { + startPosition[fieldId++] = fieldByteBegin; + if (fieldId == fieldCount) { + break; } - break; + fieldByteBegin = ++fieldByteEnd; + } else { + fieldByteEnd++; } - fieldByteBegin = fieldByteEnd + 1; - fieldByteEnd++; - } else { - if (isEscaped && bytes[fieldByteEnd] == escapeChar - && fieldByteEnd + 1 < structByteEnd) { - // ignore the char after escape_char + } + // End serves as final separator. + if (fieldByteEnd == end && fieldId < fieldCount) { + startPosition[fieldId++] = fieldByteBegin; + } + } else { + final byte escapeChar = this.escapeChar; + final int endLessOne = end - 1; + final int[] escapeCounts = this.escapeCounts; + int escapeCount = 0; + // Process the bytes that can be escaped (the last one can't be). + while (fieldByteEnd < endLessOne) { + if (bytes[fieldByteEnd] == separator) { + escapeCounts[fieldId] = escapeCount; + escapeCount = 0; + startPosition[fieldId++] = fieldByteBegin; + if (fieldId == fieldCount) { + break; + } + fieldByteBegin = ++fieldByteEnd; + } else if (bytes[fieldByteEnd] == escapeChar) { + // Ignore the char after escape_char fieldByteEnd += 2; + escapeCount++; + } else { + fieldByteEnd++; + } + } + // Process the last byte if necessary. + if (fieldByteEnd == endLessOne && fieldId < fieldCount) { + if (bytes[fieldByteEnd] == separator) { + escapeCounts[fieldId] = escapeCount; + escapeCount = 0; + startPosition[fieldId++] = fieldByteBegin; + if (fieldId <= fieldCount) { + fieldByteBegin = ++fieldByteEnd; + } } else { fieldByteEnd++; } } + // End serves as final separator. + if (fieldByteEnd == end && fieldId < fieldCount) { + escapeCounts[fieldId] = escapeCount; + startPosition[fieldId++] = fieldByteBegin; + } } - // Extra bytes at the end? - if (!extraFieldWarned && fieldByteEnd < structByteEnd) { - doExtraFieldWarned(); + if (fieldId == fieldCount || fieldByteEnd == end) { + // All fields have been parsed, or bytes have been parsed. + // We need to set the startPosition of fields.length to ensure we + // can use the same formula to calculate the length of each field. + // For missing fields, their starting positions will all be the same, + // which will make their lengths to be -1 and uncheckedGetField will + // return these fields as NULLs. + Arrays.fill(startPosition, fieldId, startPosition.length, fieldByteEnd + 1); } - // Missing fields? - if (!missingFieldWarned && parseFieldIndex < fieldCount) { - doMissingFieldWarned(parseFieldIndex); - } + isEndOfInputReached = (fieldByteEnd == end); } /* - * Reads the NULL information for a field. + * Reads the the next field. + * + * Afterwards, reading is positioned to the next field. + * + * @return Return true when the field was not null and data is put in the appropriate + * current* member. + * Otherwise, false when the field is null. * - * @return Returns true when the field is NULL; reading is positioned to the next field. - * Otherwise, false when the field is NOT NULL; reading is positioned to the field data. */ @Override - public boolean readCheckNull() { - if (fieldIndex == -1) { + public boolean readNextField() throws IOException { + if (nextFieldIndex + 1 >= fieldCount) { + return false; + } + nextFieldIndex++; + return readField(nextFieldIndex); + } + + /* + * Reads through an undesired field. + * + * No data values are valid after this call. + * Designed for skipping columns that are not included. + */ + public void skipNextField() throws IOException { + if (!parsed) { parse(); - fieldIndex = 0; - } else if (fieldIndex + 1 >= fieldCount) { - return true; + parsed = true; + } + if (nextFieldIndex + 1 >= fieldCount) { + // No more. } else { - fieldIndex++; + nextFieldIndex++; } + } - // Do we want this field? - if (columnsToInclude != null && !columnsToInclude[fieldIndex]) { + @Override + public boolean isReadFieldSupported() { + return true; + } + + private boolean checkNull(byte[] bytes, int start, int len) { + if (len != nullSequenceBytes.length) { + return false; + } + final byte[] nullSequenceBytes = this.nullSequenceBytes; + switch(len) { + case 0: + return true; + case 2: + return bytes[start] == nullSequenceBytes[0] && bytes[start+1] == nullSequenceBytes[1]; + case 4: + return bytes[start] == nullSequenceBytes[0] && bytes[start+1] == nullSequenceBytes[1] + && bytes[start+2] == nullSequenceBytes[2] && bytes[start+3] == nullSequenceBytes[3]; + default: + for (int i = 0; i < nullSequenceBytes.length; i++) { + if (bytes[start + i] != nullSequenceBytes[i]) { + return false; + } + } return true; } + } - fieldStart = startPosition[fieldIndex]; - fieldLength = startPosition[fieldIndex + 1] - startPosition[fieldIndex] - 1; + /* + * When supported, read a field by field number (i.e. random access). + * + * Currently, only LazySimpleDeserializeRead supports this. + * + * @return Return true when the field was not null and data is put in the appropriate + * current* member. + * Otherwise, false when the field is null. + */ + public boolean readField(int fieldIndex) throws IOException { + + if (!parsed) { + parse(); + parsed = true; + } + + currentFieldIndex = fieldIndex; + + final int fieldStart = startPosition[fieldIndex]; + currentFieldStart = fieldStart; + final int fieldLength = startPosition[fieldIndex + 1] - startPosition[fieldIndex] - 1; + currentFieldLength = fieldLength; if (fieldLength < 0) { - return true; + return false; } + final byte[] bytes = this.bytes; + // Is the field the configured string representing NULL? if (nullSequenceBytes != null) { - if (fieldLength == nullSequenceBytes.length) { - int i = 0; - while (true) { - if (bytes[fieldStart + i] != nullSequenceBytes[i]) { - break; - } - i++; - if (i >= fieldLength) { - return true; - } - } + if (checkNull(bytes, fieldStart, fieldLength)) { + return false; } } - /* - * We have a field and are positioned to it. Read it. - */ - switch (primitiveCategories[fieldIndex]) { - case BOOLEAN: - { - int i = fieldStart; - if (fieldLength == 4) { - if ((bytes[i] == 'T' || bytes[i] == 't') && - (bytes[i + 1] == 'R' || bytes[i + 1] == 'r') && - (bytes[i + 2] == 'U' || bytes[i + 1] == 'u') && - (bytes[i + 3] == 'E' || bytes[i + 3] == 'e')) { - currentBoolean = true; - } else { - // No boolean value match for 5 char field. - return true; - } - } else if (fieldLength == 5) { - if ((bytes[i] == 'F' || bytes[i] == 'f') && - (bytes[i + 1] == 'A' || bytes[i + 1] == 'a') && - (bytes[i + 2] == 'L' || bytes[i + 2] == 'l') && - (bytes[i + 3] == 'S' || bytes[i + 3] == 's') && - (bytes[i + 4] == 'E' || bytes[i + 4] == 'e')) { - currentBoolean = false; - } else { - // No boolean value match for 4 char field. - return true; - } - } else if (isExtendedBooleanLiteral && fieldLength == 1) { - byte b = bytes[fieldStart]; - if (b == '1' || b == 't' || b == 'T') { - currentBoolean = true; - } else if (b == '0' || b == 'f' || b == 'F') { - currentBoolean = false; + try { + /* + * We have a field and are positioned to it. Read it. + */ + switch (primitiveCategories[fieldIndex]) { + case BOOLEAN: + { + int i = fieldStart; + if (fieldLength == 4) { + if ((bytes[i] == 'T' || bytes[i] == 't') && + (bytes[i + 1] == 'R' || bytes[i + 1] == 'r') && + (bytes[i + 2] == 'U' || bytes[i + 1] == 'u') && + (bytes[i + 3] == 'E' || bytes[i + 3] == 'e')) { + currentBoolean = true; + } else { + // No boolean value match for 4 char field. + return false; + } + } else if (fieldLength == 5) { + if ((bytes[i] == 'F' || bytes[i] == 'f') && + (bytes[i + 1] == 'A' || bytes[i + 1] == 'a') && + (bytes[i + 2] == 'L' || bytes[i + 2] == 'l') && + (bytes[i + 3] == 'S' || bytes[i + 3] == 's') && + (bytes[i + 4] == 'E' || bytes[i + 4] == 'e')) { + currentBoolean = false; + } else { + // No boolean value match for 5 char field. + return false; + } + } else if (isExtendedBooleanLiteral && fieldLength == 1) { + byte b = bytes[fieldStart]; + if (b == '1' || b == 't' || b == 'T') { + currentBoolean = true; + } else if (b == '0' || b == 'f' || b == 'F') { + currentBoolean = false; + } else { + // No boolean value match for extended 1 char field. + return false; + } } else { - // No boolean value match for extended 1 char field. - return true; + // No boolean value match for other lengths. + return false; } - } else { - // No boolean value match for other lengths. - return true; } - } - break; - case BYTE: - if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) { return true; - } - try { + case BYTE: + if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) { + return false; + } currentByte = LazyByte.parseByte(bytes, fieldStart, fieldLength, 10); - } catch (NumberFormatException e) { - logExceptionMessage(bytes, fieldStart, fieldLength, "TINYINT"); - return true; - } - break; - case SHORT: - if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) { return true; - } - try { + case SHORT: + if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) { + return false; + } currentShort = LazyShort.parseShort(bytes, fieldStart, fieldLength, 10); - } catch (NumberFormatException e) { - logExceptionMessage(bytes, fieldStart, fieldLength, "SMALLINT"); - return true; - } - break; - case INT: - if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) { return true; - } - try { + case INT: + if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) { + return false; + } currentInt = LazyInteger.parseInt(bytes, fieldStart, fieldLength, 10); - } catch (NumberFormatException e) { - logExceptionMessage(bytes, fieldStart, fieldLength, "INT"); - return true; - } - break; - case LONG: - if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) { return true; - } - try { + case LONG: + if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) { + return false; + } currentLong = LazyLong.parseLong(bytes, fieldStart, fieldLength, 10); - } catch (NumberFormatException e) { - logExceptionMessage(bytes, fieldStart, fieldLength, "BIGINT"); return true; - } - break; - case FLOAT: - { + case FLOAT: if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) { - return true; - } - String byteData = null; - try { - byteData = Text.decode(bytes, fieldStart, fieldLength); - currentFloat = Float.parseFloat(byteData); - } catch (NumberFormatException e) { - LOG.debug("Data not in the Float data type range so converted to null. Given data is :" - + byteData, e); - return true; - } catch (CharacterCodingException e) { - LOG.debug("Data not in the Float data type range so converted to null.", e); - return true; + return false; } - } - break; - case DOUBLE: - { + currentFloat = + Float.parseFloat( + new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8)); + return true; + case DOUBLE: if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) { - return true; + return false; } - String byteData = null; - try { - byteData = Text.decode(bytes, fieldStart, fieldLength); - currentDouble = Double.parseDouble(byteData); - } catch (NumberFormatException e) { - LOG.debug("Data not in the Double data type range so converted to null. Given data is :" - + byteData, e); - return true; - } catch (CharacterCodingException e) { - LOG.debug("Data not in the Double data type range so converted to null.", e); - return true; - } - } - break; - - case STRING: - case CHAR: - case VARCHAR: - { - if (isEscaped) { - // First calculate the length of the output string - int outputLength = 0; - for (int i = 0; i < fieldLength; i++) { - if (bytes[fieldStart + i] != escapeChar) { - outputLength++; + currentDouble = + Double.parseDouble( + new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8)); + return true; + case STRING: + case CHAR: + case VARCHAR: + { + if (isEscaped) { + if (escapeCounts[fieldIndex] == 0) { + // No escaping. + currentExternalBufferNeeded = false; + currentBytes = bytes; + currentBytesStart = fieldStart; + currentBytesLength = fieldLength; } else { - outputLength++; - i++; + final int unescapedLength = fieldLength - escapeCounts[fieldIndex]; + if (useExternalBuffer) { + currentExternalBufferNeeded = true; + currentExternalBufferNeededLen = unescapedLength; + } else { + // The copyToBuffer will reposition and re-read the input buffer. + currentExternalBufferNeeded = false; + if (internalBufferLen < unescapedLength) { + internalBufferLen = unescapedLength; + internalBuffer = new byte[internalBufferLen]; + } + copyToBuffer(internalBuffer, 0, unescapedLength); + currentBytes = internalBuffer; + currentBytesStart = 0; + currentBytesLength = unescapedLength; + } } - } - if (outputLength == fieldLength) { - // No escaping. + } else { + // If the data is not escaped, reference the data directly. currentExternalBufferNeeded = false; currentBytes = bytes; currentBytesStart = fieldStart; - currentBytesLength = outputLength; - } else { - if (useExternalBuffer) { - currentExternalBufferNeeded = true; - currentExternalBufferNeededLen = outputLength; - } else { - // The copyToBuffer will reposition and re-read the input buffer. - currentExternalBufferNeeded = false; - if (internalBufferLen < outputLength) { - internalBufferLen = outputLength; - internalBuffer = new byte[internalBufferLen]; - } - copyToBuffer(internalBuffer, 0, outputLength); - currentBytes = internalBuffer; - currentBytesStart = 0; - currentBytesLength = outputLength; - } + currentBytesLength = fieldLength; } - } else { - // If the data is not escaped, reference the data directly. - currentExternalBufferNeeded = false; - currentBytes = bytes; - currentBytesStart = fieldStart; - currentBytesLength = fieldLength; } - } - break; - case BINARY: - { - byte[] recv = new byte[fieldLength]; - System.arraycopy(bytes, fieldStart, recv, 0, fieldLength); - byte[] decoded = LazyBinary.decodeIfNeeded(recv); - // use the original bytes in case decoding should fail - decoded = decoded.length > 0 ? decoded : recv; - currentBytes = decoded; - currentBytesStart = 0; - currentBytesLength = decoded.length; - } - break; - case DATE: - { - if (!LazyUtils.isDateMaybe(bytes, fieldStart, fieldLength)) { - return true; - } - String s = null; - try { - s = Text.decode(bytes, fieldStart, fieldLength); - currentDateWritable.set(Date.valueOf(s)); - } catch (Exception e) { - logExceptionMessage(bytes, fieldStart, fieldLength, "DATE"); - return true; + return true; + case BINARY: + { + byte[] recv = new byte[fieldLength]; + System.arraycopy(bytes, fieldStart, recv, 0, fieldLength); + byte[] decoded = LazyBinary.decodeIfNeeded(recv); + // use the original bytes in case decoding should fail + decoded = decoded.length > 0 ? decoded : recv; + currentBytes = decoded; + currentBytesStart = 0; + currentBytesLength = decoded.length; } - } - break; - case TIMESTAMP: - { + return true; + case DATE: if (!LazyUtils.isDateMaybe(bytes, fieldStart, fieldLength)) { - return true; + return false; } - String s = null; - try { - s = new String(bytes, fieldStart, fieldLength, "US-ASCII"); - } catch (UnsupportedEncodingException e) { - LOG.error("Unsupported encoding found ", e); - s = ""; - } - - if (s.compareTo("NULL") == 0) { - logExceptionMessage(bytes, fieldStart, fieldLength, "TIMESTAMP"); - return true; - } else { + currentDateWritable.set( + Date.valueOf( + new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8))); + return true; + case TIMESTAMP: + { + if (!LazyUtils.isDateMaybe(bytes, fieldStart, fieldLength)) { + return false; + } + String s = new String(bytes, fieldStart, fieldLength, StandardCharsets.US_ASCII); + if (s.compareTo("NULL") == 0) { + logExceptionMessage(bytes, fieldStart, fieldLength, "TIMESTAMP"); + return false; + } try { - if (timestampParser == null) { - timestampParser = new TimestampParser(); - } currentTimestampWritable.set(timestampParser.parseTimestamp(s)); } catch (IllegalArgumentException e) { logExceptionMessage(bytes, fieldStart, fieldLength, "TIMESTAMP"); - return true; + return false; } } - } - break; - case INTERVAL_YEAR_MONTH: - { + return true; + case INTERVAL_YEAR_MONTH: if (fieldLength == 0) { - return true; + return false; } - String s = null; try { - s = Text.decode(bytes, fieldStart, fieldLength); + String s = new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8); currentHiveIntervalYearMonthWritable.set(HiveIntervalYearMonth.valueOf(s)); } catch (Exception e) { logExceptionMessage(bytes, fieldStart, fieldLength, "INTERVAL_YEAR_MONTH"); - return true; + return false; } - } - break; - case INTERVAL_DAY_TIME: - { + return true; + case INTERVAL_DAY_TIME: if (fieldLength == 0) { - return true; + return false; } - String s = null; try { - s = Text.decode(bytes, fieldStart, fieldLength); + String s = new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8); currentHiveIntervalDayTimeWritable.set(HiveIntervalDayTime.valueOf(s)); } catch (Exception e) { logExceptionMessage(bytes, fieldStart, fieldLength, "INTERVAL_DAY_TIME"); - return true; + return false; } - } - break; - case DECIMAL: - { - if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) { - return true; - } - String byteData = null; - try { - byteData = Text.decode(bytes, fieldStart, fieldLength); - } catch (CharacterCodingException e) { - LOG.debug("Data not in the HiveDecimal data type range so converted to null.", e); - return true; + return true; + case DECIMAL: + { + if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) { + return false; + } + String byteData = new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8); + HiveDecimal decimal = HiveDecimal.create(byteData); + DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfos[fieldIndex]; + int precision = decimalTypeInfo.getPrecision(); + int scale = decimalTypeInfo.getScale(); + decimal = HiveDecimal.enforcePrecisionScale(decimal, precision, scale); + if (decimal == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Data not in the HiveDecimal data type range so converted to null. Given data is :" + + byteData); + } + return false; + } + currentHiveDecimalWritable.set(decimal); } + return true; - HiveDecimal decimal = HiveDecimal.create(byteData); - DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfos[fieldIndex]; - int precision = decimalTypeInfo.getPrecision(); - int scale = decimalTypeInfo.getScale(); - decimal = HiveDecimal.enforcePrecisionScale( - decimal, precision, scale); - if (decimal == null) { - LOG.debug("Data not in the HiveDecimal data type range so converted to null. Given data is :" - + byteData); - return true; - } - currentHiveDecimalWritable.set(decimal); + default: + throw new Error("Unexpected primitive category " + primitiveCategories[fieldIndex].name()); } - break; - - default: - throw new Error("Unexpected primitive category " + primitiveCategories[fieldIndex].name()); + } catch (NumberFormatException nfe) { + // U+FFFD will throw this as well + logExceptionMessage(bytes, fieldStart, fieldLength, primitiveCategories[fieldIndex]); + return false; } - - return false; } @Override @@ -570,6 +589,8 @@ public final class LazySimpleDeserializeRead extends DeserializeRead { } private void copyToBuffer(byte[] buffer, int bufferStart, int bufferLength) { + + final int fieldStart = currentFieldStart; int k = 0; for (int i = 0; i < bufferLength; i++) { byte b = bytes[fieldStart + i]; @@ -590,9 +611,44 @@ public final class LazySimpleDeserializeRead extends DeserializeRead { } } + /* + * Call this method may be called after all the all fields have been read to check + * for unread fields. + * + * Note that when optimizing reading to stop reading unneeded include columns, worrying + * about whether all data is consumed is not appropriate (often we aren't reading it all by + * design). + * + * Since LazySimpleDeserializeRead parses the line through the last desired column it does + * support this function. + */ + public boolean isEndOfInputReached() { + return isEndOfInputReached; + } + + public void logExceptionMessage(byte[] bytes, int bytesStart, int bytesLength, + PrimitiveCategory dataCategory) { + final String dataType; + switch (dataCategory) { + case BYTE: + dataType = "TINYINT"; + break; + case LONG: + dataType = "BIGINT"; + break; + case SHORT: + dataType = "SMALLINT"; + break; + default: + dataType = dataCategory.toString(); + break; + } + logExceptionMessage(bytes, bytesStart, bytesLength, dataType); + } + public void logExceptionMessage(byte[] bytes, int bytesStart, int bytesLength, String dataType) { try { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { String byteData = Text.decode(bytes, bytesStart, bytesLength); LOG.debug("Data not in the " + dataType + " data type range so converted to null. Given data is :" + @@ -603,38 +659,6 @@ public final class LazySimpleDeserializeRead extends DeserializeRead { } } - /* - * Call this method after all fields have been read to check for extra fields. - */ - @Override - public void extraFieldsCheck() { - // UNDONE: Get rid of... - } - - /* - * Read integrity warning flags. - */ - @Override - public boolean readBeyondConfiguredFieldsWarned() { - return missingFieldWarned; - } - @Override - public boolean bufferRangeHasExtraDataWarned() { - return false; - } - - private void doExtraFieldWarned() { - extraFieldWarned = true; - LOG.warn("Extra bytes detected at the end of the row! Ignoring similar " - + "problems."); - } - - private void doMissingFieldWarned(int fieldId) { - missingFieldWarned = true; - LOG.info("Missing fields! Expected " + fieldCount + " fields but " - + "only got " + fieldId + "! Ignoring similar problems."); - } - //------------------------------------------------------------------------------------------------ private static byte[] maxLongBytes = ((Long) Long.MAX_VALUE).toString().getBytes();