sunchao commented on a change in pull request #35262:
URL: https://github.com/apache/spark/pull/35262#discussion_r820013541
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java
##########
@@ -16,51 +16,115 @@
*/
package org.apache.spark.sql.execution.datasources.parquet;
+import static org.apache.spark.sql.types.DataTypes.BinaryType;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+
import org.apache.parquet.bytes.ByteBufferInputStream;
-import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader;
+import org.apache.parquet.column.values.RequiresPreviousReader;
+import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
- * An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the
vectorized interface.
+ * An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the
vectorized
+ * interface.
*/
-public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase {
- private final DeltaByteArrayReader deltaByteArrayReader = new
DeltaByteArrayReader();
+public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase
+ implements VectorizedValuesReader, RequiresPreviousReader {
+
+ private final VectorizedDeltaBinaryPackedReader prefixLengthReader;
+ private final VectorizedDeltaLengthByteArrayReader suffixReader;
+ private WritableColumnVector prefixLengthVector;
+ private WritableColumnVector suffixVector;
+ private byte[] previous = new byte[0];
+ private int currentRow = 0;
+
+ // temporary variable used by getBinary
+ private final WritableColumnVector binaryValVector;
+
+ VectorizedDeltaByteArrayReader() {
+ this.prefixLengthReader = new VectorizedDeltaBinaryPackedReader();
+ this.suffixReader = new VectorizedDeltaLengthByteArrayReader();
+ binaryValVector = new OnHeapColumnVector(1, BinaryType);
+ }
@Override
public void initFromPage(int valueCount, ByteBufferInputStream in) throws
IOException {
- deltaByteArrayReader.initFromPage(valueCount, in);
+ prefixLengthVector = new OnHeapColumnVector(valueCount, IntegerType);
+ suffixVector = new OnHeapColumnVector(valueCount, BinaryType);
+ prefixLengthReader.initFromPage(valueCount, in);
+ prefixLengthReader.readIntegers(prefixLengthReader.getTotalValueCount(),
+ prefixLengthVector, 0);
+ suffixReader.initFromPage(valueCount, in);
+ suffixReader.readBinary(prefixLengthReader.getTotalValueCount(),
suffixVector, 0);
}
@Override
public Binary readBinary(int len) {
- return deltaByteArrayReader.readBytes();
+ readValues(1, binaryValVector, 0,
ByteBufferOutputWriter::writeArrayByteBuffer);
+ return Binary.fromConstantByteArray(binaryValVector.getBinary(0));
}
- @Override
- public void readBinary(int total, WritableColumnVector c, int rowId) {
+ private void readValues(int total, WritableColumnVector c, int rowId,
+ ByteBufferOutputWriter outputWriter) {
for (int i = 0; i < total; i++) {
- Binary binary = deltaByteArrayReader.readBytes();
- ByteBuffer buffer = binary.toByteBuffer();
- if (buffer.hasArray()) {
- c.putByteArray(rowId + i, buffer.array(), buffer.arrayOffset() +
buffer.position(),
- binary.length());
+ // NOTE: due to PARQUET-246, it is important that we
+ // respect prefixLength which was read from prefixLengthReader,
+ // even for the *first* value of a page. Even though the first
+ // value of the page should have an empty prefix, it may not
+ // because of PARQUET-246.
+ int prefixLength = prefixLengthVector.getInt(currentRow);
+ byte[] suffix = suffixVector.getBinary(currentRow);
+ int length = prefixLength + suffix.length;
+
+ // We have to do this to materialize the output
+ if (prefixLength != 0) {
+ // We could do
+ // c.putByteArray(rowId + i, previous, 0, prefixLength);
+ // c.putByteArray(rowId+i, suffix, prefixLength, suffix.length);
+ // previous = c.getBinary(rowId+1);
+ // but it incurs the same cost of copying the values twice _and_
c.getBinary
+ // is a _slow_ byte by byte copy
+ // The following always uses the faster system arraycopy method
+ byte[] out = new byte[length];
Review comment:
We can also potentially skip this copying at least for
`OnHeapColumnVector`. [I tried
it](https://github.com/sunchao/spark/commit/95f272c39a19cb1abdbc6c79e96b8fae435a96ab)
and it gives some extra performance improvements.
```
[info] OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Mac OS X 10.16
[info] Intel(R) Core(TM) i9-10910 CPU @ 3.60GHz
[info] String with Nulls Scan (0.0%): Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info]
------------------------------------------------------------------------------------------------------------------------
[info] SQL CSV 5721
5727 8 1.8 545.6 1.0X
[info] SQL Json 6289
6295 9 1.7 599.7 0.9X
[info] SQL Parquet Vectorized: DataPageV1 700
800 87 15.0 66.7 8.2X
[info] SQL Parquet Vectorized: DataPageV2 994
1031 52 10.5 94.8 5.8X
[info] SQL Parquet MR: DataPageV1 2035
2051 23 5.2 194.1 2.8X
[info] SQL Parquet MR: DataPageV2 2289
2454 232 4.6 218.3 2.5X
[info] ParquetReader Vectorized: DataPageV1 472
482 15 22.2 45.0 12.1X
[info] ParquetReader Vectorized: DataPageV2 640
645 4 16.4 61.0 8.9X
[info] SQL ORC Vectorized 670
694 35 15.7 63.9 8.5X
[info] SQL ORC MR 1846
2047 284 5.7 176.0 3.1X
[info] OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Mac OS X 10.16
[info] Intel(R) Core(TM) i9-10910 CPU @ 3.60GHz
[info] String with Nulls Scan (50.0%): Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info]
------------------------------------------------------------------------------------------------------------------------
[info] SQL CSV 4825
4890 91 2.2 460.2 1.0X
[info] SQL Json 5298
7385 2951 2.0 505.3 0.9X
[info] SQL Parquet Vectorized: DataPageV1 701
889 169 14.9 66.9 6.9X
[info] SQL Parquet Vectorized: DataPageV2 684
737 58 15.3 65.2 7.1X
[info] SQL Parquet MR: DataPageV1 1857
1869 17 5.6 177.1 2.6X
[info] SQL Parquet MR: DataPageV2 2034
2146 159 5.2 193.9 2.4X
[info] ParquetReader Vectorized: DataPageV1 474
493 11 22.1 45.2 10.2X
[info] ParquetReader Vectorized: DataPageV2 585
586 1 17.9 55.8 8.2X
[info] SQL ORC Vectorized 810
845 53 12.9 77.3 6.0X
[info] SQL ORC MR 1854
1935 114 5.7 176.8 2.6X
[info] OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Mac OS X 10.16
[info] Intel(R) Core(TM) i9-10910 CPU @ 3.60GHz
[info] String with Nulls Scan (95.0%): Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info]
------------------------------------------------------------------------------------------------------------------------
[info] SQL CSV 3212
3256 63 3.3 306.3 1.0X
[info] SQL Json 3693
3695 3 2.8 352.2 0.9X
[info] SQL Parquet Vectorized: DataPageV1 147
203 46 71.2 14.0 21.8X
[info] SQL Parquet Vectorized: DataPageV2 160
286 144 65.4 15.3 20.0X
[info] SQL Parquet MR: DataPageV1 1229
1351 172 8.5 117.2 2.6X
[info] SQL Parquet MR: DataPageV2 1074
1099 36 9.8 102.4 3.0X
[info] ParquetReader Vectorized: DataPageV1 107
109 2 97.9 10.2 30.0X
[info] ParquetReader Vectorized: DataPageV2 124
127 2 84.7 11.8 25.9X
[info] SQL ORC Vectorized 262
308 86 40.0 25.0 12.3X
[info] SQL ORC MR 1002
1070 96 10.5 95.5 3.2X
``
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]