parthchandra commented on a change in pull request #35262:
URL: https://github.com/apache/spark/pull/35262#discussion_r791231334



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
##########
@@ -147,12 +147,19 @@ class ParquetEncodingSuite extends 
ParquetCompatibilityTest with SharedSparkSess
       withTempPath { dir =>
         val path = s"${dir.getCanonicalPath}/test.parquet"
 
-        val data = (1 to 3).map { i =>
-          ( i, i.toLong, i.toShort, Array[Byte](i.toByte), s"test_${i}",
-            DateTimeUtils.fromJavaDate(Date.valueOf(s"2021-11-0" + i)),
-            DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(s"2020-11-01 
12:00:0" + i)),
-            Period.of(1, i, 0), Duration.ofMillis(i * 100),
-            new BigDecimal(java.lang.Long.toUnsignedString(i*100000))
+        // Have more than 2 * 4096 records (so we have multiple tasks and each 
task
+        // reads at least twice from the reader). This will catch any issues 
with state
+        // maintained by the reader(s)
+        // Add at least one string with a null
+        val data = (1 to 8197).map { i =>
+          ( i,
+            i.toLong, i.toShort, Array[Byte](i.toByte),
+            if (i % 2 == 1) s"test_${i}" else null,

Review comment:
       done

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java
##########
@@ -16,51 +16,133 @@
  */
 package org.apache.spark.sql.execution.datasources.parquet;
 
+import static org.apache.spark.sql.types.DataTypes.BinaryType;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+
 import org.apache.parquet.bytes.ByteBufferInputStream;
-import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader;
+import org.apache.parquet.column.values.RequiresPreviousReader;
+import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.io.api.Binary;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
 /**
- * An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the 
vectorized interface.
+ * An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the 
vectorized
+ * interface.
  */
-public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase {
-  private final DeltaByteArrayReader deltaByteArrayReader = new 
DeltaByteArrayReader();
+public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase
+    implements VectorizedValuesReader, RequiresPreviousReader {
+
+  private final MemoryMode memoryMode;
+  private int valueCount;
+  private final VectorizedDeltaBinaryPackedReader prefixLengthReader =
+      new VectorizedDeltaBinaryPackedReader();
+  private final VectorizedDeltaLengthByteArrayReader suffixReader;
+  private WritableColumnVector prefixLengthVector;
+  private WritableColumnVector suffixVector;
+  private byte[] previous = new byte[0];
+  private int currentRow = 0;
+
+  //temporary variable used by getBinary
+  Binary binaryVal;
+
+  VectorizedDeltaByteArrayReader(MemoryMode memoryMode){
+    this.memoryMode = memoryMode;
+    this.suffixReader = new VectorizedDeltaLengthByteArrayReader(memoryMode);
+  }
 
   @Override
   public void initFromPage(int valueCount, ByteBufferInputStream in) throws 
IOException {
-    deltaByteArrayReader.initFromPage(valueCount, in);
+    this.valueCount = valueCount;
+    if (memoryMode == MemoryMode.OFF_HEAP) {
+      prefixLengthVector = new OffHeapColumnVector(valueCount, IntegerType);
+      suffixVector = new OffHeapColumnVector(valueCount, BinaryType);
+    } else {
+      prefixLengthVector = new OnHeapColumnVector(valueCount, IntegerType);
+      suffixVector = new OnHeapColumnVector(valueCount, BinaryType);
+    }
+    prefixLengthReader.initFromPage(valueCount, in);
+    prefixLengthReader.readIntegers(prefixLengthReader.getTotalValueCount(),
+        prefixLengthVector, 0);
+    suffixReader.initFromPage(valueCount, in);
+    suffixReader.readBinary(valueCount, suffixVector, 0);
   }
 
   @Override
   public Binary readBinary(int len) {
-    return deltaByteArrayReader.readBytes();
+    readValues(1, null, 0,
+        (w, r, v, l) ->

Review comment:
        What you say makes sense: that the reference to the external variable 
will cause multiple object instantiation. Thank you for doing this reasearch!
    I tried something similar but with the unit test in ParquetEncodingSuite 
and see only a single instance of the lambda created (not sure why).
    I've changed the code to use a WritableVector of size 1 which eliminates 
the need to access the variable directly. 
    




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to