sunchao commented on a change in pull request #35262:
URL: https://github.com/apache/spark/pull/35262#discussion_r820013541



##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java
##########
@@ -16,51 +16,115 @@
  */
 package org.apache.spark.sql.execution.datasources.parquet;
 
+import static org.apache.spark.sql.types.DataTypes.BinaryType;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+
 import org.apache.parquet.bytes.ByteBufferInputStream;
-import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader;
+import org.apache.parquet.column.values.RequiresPreviousReader;
+import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.io.api.Binary;
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
 /**
- * An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the 
vectorized interface.
+ * An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the 
vectorized
+ * interface.
  */
-public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase {
-  private final DeltaByteArrayReader deltaByteArrayReader = new 
DeltaByteArrayReader();
+public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase
+    implements VectorizedValuesReader, RequiresPreviousReader {
+
+  private final VectorizedDeltaBinaryPackedReader prefixLengthReader;
+  private final VectorizedDeltaLengthByteArrayReader suffixReader;
+  private WritableColumnVector prefixLengthVector;
+  private WritableColumnVector suffixVector;
+  private byte[] previous = new byte[0];
+  private int currentRow = 0;
+
+  // temporary variable used by getBinary
+  private final WritableColumnVector binaryValVector;
+
+  VectorizedDeltaByteArrayReader() {
+    this.prefixLengthReader = new VectorizedDeltaBinaryPackedReader();
+    this.suffixReader = new VectorizedDeltaLengthByteArrayReader();
+    binaryValVector = new OnHeapColumnVector(1, BinaryType);
+  }
 
   @Override
   public void initFromPage(int valueCount, ByteBufferInputStream in) throws 
IOException {
-    deltaByteArrayReader.initFromPage(valueCount, in);
+    prefixLengthVector = new OnHeapColumnVector(valueCount, IntegerType);
+    suffixVector = new OnHeapColumnVector(valueCount, BinaryType);
+    prefixLengthReader.initFromPage(valueCount, in);
+    prefixLengthReader.readIntegers(prefixLengthReader.getTotalValueCount(),
+        prefixLengthVector, 0);
+    suffixReader.initFromPage(valueCount, in);
+    suffixReader.readBinary(prefixLengthReader.getTotalValueCount(), 
suffixVector, 0);
   }
 
   @Override
   public Binary readBinary(int len) {
-    return deltaByteArrayReader.readBytes();
+    readValues(1, binaryValVector, 0, 
ByteBufferOutputWriter::writeArrayByteBuffer);
+    return Binary.fromConstantByteArray(binaryValVector.getBinary(0));
   }
 
-  @Override
-  public void readBinary(int total, WritableColumnVector c, int rowId) {
+  private void readValues(int total, WritableColumnVector c, int rowId,
+      ByteBufferOutputWriter outputWriter) {
     for (int i = 0; i < total; i++) {
-      Binary binary = deltaByteArrayReader.readBytes();
-      ByteBuffer buffer = binary.toByteBuffer();
-      if (buffer.hasArray()) {
-        c.putByteArray(rowId + i, buffer.array(), buffer.arrayOffset() + 
buffer.position(),
-          binary.length());
+      // NOTE: due to PARQUET-246, it is important that we
+      // respect prefixLength which was read from prefixLengthReader,
+      // even for the *first* value of a page. Even though the first
+      // value of the page should have an empty prefix, it may not
+      // because of PARQUET-246.
+      int prefixLength = prefixLengthVector.getInt(currentRow);
+      byte[] suffix = suffixVector.getBinary(currentRow);
+      int length = prefixLength + suffix.length;
+
+      // We have to do this to materialize the output
+      if (prefixLength != 0) {
+        // We could do
+        //  c.putByteArray(rowId + i, previous, 0, prefixLength);
+        //  c.putByteArray(rowId+i, suffix, prefixLength, suffix.length);
+        //  previous =  c.getBinary(rowId+1);
+        // but it incurs the same cost of copying the values twice _and_ 
c.getBinary
+        // is a _slow_ byte by byte copy
+        // The following always uses the faster system arraycopy method
+        byte[] out = new byte[length];

Review comment:
       We can also potentially skip this copying at least for 
`OnHeapColumnVector`. [I tried 
it](https://github.com/sunchao/spark/commit/95f272c39a19cb1abdbc6c79e96b8fae435a96ab)
 and it gives some extra performance improvements.
   
   ```
   [info] OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Mac OS X 10.16
   [info] Intel(R) Core(TM) i9-10910 CPU @ 3.60GHz
   [info] String with Nulls Scan (0.0%):            Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
   [info] 
------------------------------------------------------------------------------------------------------------------------
   [info] SQL CSV                                            5721           
5727           8          1.8         545.6       1.0X
   [info] SQL Json                                           6289           
6295           9          1.7         599.7       0.9X
   [info] SQL Parquet Vectorized: DataPageV1                  700            
800          87         15.0          66.7       8.2X
   [info] SQL Parquet Vectorized: DataPageV2                  994           
1031          52         10.5          94.8       5.8X
   [info] SQL Parquet MR: DataPageV1                         2035           
2051          23          5.2         194.1       2.8X
   [info] SQL Parquet MR: DataPageV2                         2289           
2454         232          4.6         218.3       2.5X
   [info] ParquetReader Vectorized: DataPageV1                472            
482          15         22.2          45.0      12.1X
   [info] ParquetReader Vectorized: DataPageV2                640            
645           4         16.4          61.0       8.9X
   [info] SQL ORC Vectorized                                  670            
694          35         15.7          63.9       8.5X
   [info] SQL ORC MR                                         1846           
2047         284          5.7         176.0       3.1X
   
   [info] OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Mac OS X 10.16
   [info] Intel(R) Core(TM) i9-10910 CPU @ 3.60GHz
   [info] String with Nulls Scan (50.0%):           Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
   [info] 
------------------------------------------------------------------------------------------------------------------------
   [info] SQL CSV                                            4825           
4890          91          2.2         460.2       1.0X
   [info] SQL Json                                           5298           
7385        2951          2.0         505.3       0.9X
   [info] SQL Parquet Vectorized: DataPageV1                  701            
889         169         14.9          66.9       6.9X
   [info] SQL Parquet Vectorized: DataPageV2                  684            
737          58         15.3          65.2       7.1X
   [info] SQL Parquet MR: DataPageV1                         1857           
1869          17          5.6         177.1       2.6X
   [info] SQL Parquet MR: DataPageV2                         2034           
2146         159          5.2         193.9       2.4X
   [info] ParquetReader Vectorized: DataPageV1                474            
493          11         22.1          45.2      10.2X
   [info] ParquetReader Vectorized: DataPageV2                585            
586           1         17.9          55.8       8.2X
   [info] SQL ORC Vectorized                                  810            
845          53         12.9          77.3       6.0X
   [info] SQL ORC MR                                         1854           
1935         114          5.7         176.8       2.6X
   
   [info] OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Mac OS X 10.16
   [info] Intel(R) Core(TM) i9-10910 CPU @ 3.60GHz
   [info] String with Nulls Scan (95.0%):           Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
   [info] 
------------------------------------------------------------------------------------------------------------------------
   [info] SQL CSV                                            3212           
3256          63          3.3         306.3       1.0X
   [info] SQL Json                                           3693           
3695           3          2.8         352.2       0.9X
   [info] SQL Parquet Vectorized: DataPageV1                  147            
203          46         71.2          14.0      21.8X
   [info] SQL Parquet Vectorized: DataPageV2                  160            
286         144         65.4          15.3      20.0X
   [info] SQL Parquet MR: DataPageV1                         1229           
1351         172          8.5         117.2       2.6X
   [info] SQL Parquet MR: DataPageV2                         1074           
1099          36          9.8         102.4       3.0X
   [info] ParquetReader Vectorized: DataPageV1                107            
109           2         97.9          10.2      30.0X
   [info] ParquetReader Vectorized: DataPageV2                124            
127           2         84.7          11.8      25.9X
   [info] SQL ORC Vectorized                                  262            
308          86         40.0          25.0      12.3X
   [info] SQL ORC MR                                         1002           
1070          96         10.5          95.5       3.2X
   ``




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to