Repository: spark Updated Branches: refs/heads/branch-2.0 b8f65dad7 -> c0ea77071
Revert "[SPARK-16334] Reusing same dictionary column for decoding consecutive row groups shouldn't throw an error" This reverts commit a3930c3b9afa9f7eba2a5c8b8f279ca38e348e9b. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c0ea7707 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c0ea7707 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c0ea7707 Branch: refs/heads/branch-2.0 Commit: c0ea7707127c92ecb51794b96ea40d7cdb28b168 Parents: b8f65da Author: Davies Liu <davies....@gmail.com> Authored: Fri Sep 2 16:05:37 2016 -0700 Committer: Davies Liu <davies....@gmail.com> Committed: Fri Sep 2 16:05:37 2016 -0700 ---------------------------------------------------------------------- .../parquet/VectorizedColumnReader.java | 54 ++++++-------------- 1 file changed, 16 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c0ea7707/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index cb51cb4..6c47dc0 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -221,21 +221,15 @@ public class VectorizedColumnReader { if (column.dataType() == DataTypes.IntegerType || DecimalType.is32BitDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { - if (!column.isNullAt(i)) { - column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i))); - } + column.putInt(i, dictionary.decodeToInt(dictionaryIds.getInt(i))); } } else if (column.dataType() == DataTypes.ByteType) { for (int i = rowId; i < rowId + num; ++i) { - if (!column.isNullAt(i)) { - column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getDictId(i))); - } + column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getInt(i))); } } else if (column.dataType() == DataTypes.ShortType) { for (int i = rowId; i < rowId + num; ++i) { - if (!column.isNullAt(i)) { - column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getDictId(i))); - } + column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getInt(i))); } } else { throw new UnsupportedOperationException("Unimplemented type: " + column.dataType()); @@ -246,9 +240,7 @@ public class VectorizedColumnReader { if (column.dataType() == DataTypes.LongType || DecimalType.is64BitDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { - if (!column.isNullAt(i)) { - column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i))); - } + column.putLong(i, dictionary.decodeToLong(dictionaryIds.getInt(i))); } } else { throw new UnsupportedOperationException("Unimplemented type: " + column.dataType()); @@ -257,27 +249,21 @@ public class VectorizedColumnReader { case FLOAT: for (int i = rowId; i < rowId + num; ++i) { - if (!column.isNullAt(i)) { - column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getDictId(i))); - } + column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getInt(i))); } break; case DOUBLE: for (int i = rowId; i < rowId + num; ++i) { - if (!column.isNullAt(i)) { - column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getDictId(i))); - } + column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getInt(i))); } break; case INT96: if (column.dataType() == DataTypes.TimestampType) { for (int i = rowId; i < rowId + num; ++i) { // TODO: Convert dictionary of Binaries to dictionary of Longs - if (!column.isNullAt(i)) { - Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); - column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v)); - } + Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); + column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v)); } } else { throw new UnsupportedOperationException(); @@ -289,34 +275,26 @@ public class VectorizedColumnReader { // and reuse it across batches. This should mean adding a ByteArray would just update // the length and offset. for (int i = rowId; i < rowId + num; ++i) { - if (!column.isNullAt(i)) { - Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); - column.putByteArray(i, v.getBytes()); - } + Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); + column.putByteArray(i, v.getBytes()); } break; case FIXED_LEN_BYTE_ARRAY: // DecimalType written in the legacy mode if (DecimalType.is32BitDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { - if (!column.isNullAt(i)) { - Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); - column.putInt(i, (int) ParquetRowConverter.binaryToUnscaledLong(v)); - } + Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); + column.putInt(i, (int) ParquetRowConverter.binaryToUnscaledLong(v)); } } else if (DecimalType.is64BitDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { - if (!column.isNullAt(i)) { - Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); - column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v)); - } + Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); + column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v)); } } else if (DecimalType.isByteArrayDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { - if (!column.isNullAt(i)) { - Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); - column.putByteArray(i, v.getBytes()); - } + Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); + column.putByteArray(i, v.getBytes()); } } else { throw new UnsupportedOperationException(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org