parthchandra commented on a change in pull request #35262:
URL: https://github.com/apache/spark/pull/35262#discussion_r790027843
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
##########
@@ -283,25 +294,30 @@ private void initDataReader(
} catch (IOException e) {
throw new IOException("could not read page in col " + descriptor, e);
}
+ if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion,
dataEncoding) &&
+ previousReader != null && previousReader instanceof
RequiresPreviousReader) {
Review comment:
You're right, it is not needed
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
##########
@@ -86,4 +87,26 @@ void readLongsWithRebase(
void write(WritableColumnVector outputColumnVector, int rowId, long val);
}
+ @FunctionalInterface
+ interface ByteBufferOutputWriter {
+ void write(WritableColumnVector c, int rowId, ByteBuffer val, int length);
+
+ static void writeArrayByteBuffer(WritableColumnVector c, int rowId,
ByteBuffer val,
Review comment:
I don't know if it is frowned upon. In this case, not including in the
interface only leads to some code bloat.
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java
##########
@@ -16,51 +16,133 @@
*/
package org.apache.spark.sql.execution.datasources.parquet;
+import static org.apache.spark.sql.types.DataTypes.BinaryType;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+
import org.apache.parquet.bytes.ByteBufferInputStream;
-import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader;
+import org.apache.parquet.column.values.RequiresPreviousReader;
+import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
- * An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the
vectorized interface.
+ * An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the
vectorized
+ * interface.
*/
-public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase {
- private final DeltaByteArrayReader deltaByteArrayReader = new
DeltaByteArrayReader();
+public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase
+ implements VectorizedValuesReader, RequiresPreviousReader {
+
+ private final MemoryMode memoryMode;
+ private int valueCount;
+ private final VectorizedDeltaBinaryPackedReader prefixLengthReader =
+ new VectorizedDeltaBinaryPackedReader();
+ private final VectorizedDeltaLengthByteArrayReader suffixReader;
+ private WritableColumnVector prefixLengthVector;
+ private WritableColumnVector suffixVector;
+ private byte[] previous = new byte[0];
+ private int currentRow = 0;
+
+ //temporary variable used by getBinary
+ Binary binaryVal;
Review comment:
done
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java
##########
@@ -16,51 +16,133 @@
*/
package org.apache.spark.sql.execution.datasources.parquet;
+import static org.apache.spark.sql.types.DataTypes.BinaryType;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+
import org.apache.parquet.bytes.ByteBufferInputStream;
-import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader;
+import org.apache.parquet.column.values.RequiresPreviousReader;
+import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
- * An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the
vectorized interface.
+ * An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the
vectorized
+ * interface.
*/
-public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase {
- private final DeltaByteArrayReader deltaByteArrayReader = new
DeltaByteArrayReader();
+public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase
+ implements VectorizedValuesReader, RequiresPreviousReader {
+
+ private final MemoryMode memoryMode;
+ private int valueCount;
+ private final VectorizedDeltaBinaryPackedReader prefixLengthReader =
+ new VectorizedDeltaBinaryPackedReader();
+ private final VectorizedDeltaLengthByteArrayReader suffixReader;
+ private WritableColumnVector prefixLengthVector;
+ private WritableColumnVector suffixVector;
+ private byte[] previous = new byte[0];
+ private int currentRow = 0;
+
+ //temporary variable used by getBinary
+ Binary binaryVal;
+
+ VectorizedDeltaByteArrayReader(MemoryMode memoryMode){
+ this.memoryMode = memoryMode;
+ this.suffixReader = new VectorizedDeltaLengthByteArrayReader(memoryMode);
+ }
@Override
public void initFromPage(int valueCount, ByteBufferInputStream in) throws
IOException {
- deltaByteArrayReader.initFromPage(valueCount, in);
+ this.valueCount = valueCount;
Review comment:
Nope it doesn't. Removed
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java
##########
@@ -16,51 +16,133 @@
*/
package org.apache.spark.sql.execution.datasources.parquet;
+import static org.apache.spark.sql.types.DataTypes.BinaryType;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+
import org.apache.parquet.bytes.ByteBufferInputStream;
-import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader;
+import org.apache.parquet.column.values.RequiresPreviousReader;
+import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
- * An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the
vectorized interface.
+ * An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the
vectorized
+ * interface.
*/
-public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase {
- private final DeltaByteArrayReader deltaByteArrayReader = new
DeltaByteArrayReader();
+public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase
+ implements VectorizedValuesReader, RequiresPreviousReader {
+
+ private final MemoryMode memoryMode;
+ private int valueCount;
+ private final VectorizedDeltaBinaryPackedReader prefixLengthReader =
+ new VectorizedDeltaBinaryPackedReader();
+ private final VectorizedDeltaLengthByteArrayReader suffixReader;
+ private WritableColumnVector prefixLengthVector;
+ private WritableColumnVector suffixVector;
+ private byte[] previous = new byte[0];
+ private int currentRow = 0;
+
+ //temporary variable used by getBinary
+ Binary binaryVal;
+
+ VectorizedDeltaByteArrayReader(MemoryMode memoryMode){
+ this.memoryMode = memoryMode;
+ this.suffixReader = new VectorizedDeltaLengthByteArrayReader(memoryMode);
+ }
@Override
public void initFromPage(int valueCount, ByteBufferInputStream in) throws
IOException {
- deltaByteArrayReader.initFromPage(valueCount, in);
+ this.valueCount = valueCount;
+ if (memoryMode == MemoryMode.OFF_HEAP) {
+ prefixLengthVector = new OffHeapColumnVector(valueCount, IntegerType);
+ suffixVector = new OffHeapColumnVector(valueCount, BinaryType);
+ } else {
+ prefixLengthVector = new OnHeapColumnVector(valueCount, IntegerType);
+ suffixVector = new OnHeapColumnVector(valueCount, BinaryType);
+ }
+ prefixLengthReader.initFromPage(valueCount, in);
+ prefixLengthReader.readIntegers(prefixLengthReader.getTotalValueCount(),
+ prefixLengthVector, 0);
+ suffixReader.initFromPage(valueCount, in);
+ suffixReader.readBinary(valueCount, suffixVector, 0);
}
@Override
public Binary readBinary(int len) {
- return deltaByteArrayReader.readBytes();
+ readValues(1, null, 0,
+ (w, r, v, l) ->
Review comment:
I really hope not. AFAIK, lambdas are highly optimized to not incur
object creation overhead. I'm not sure if the function call overhead might also
be eliminated by inlining.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]