LuciferYang commented on a change in pull request #35262:
URL: https://github.com/apache/spark/pull/35262#discussion_r797417782
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java
##########
@@ -16,51 +16,121 @@
*/
package org.apache.spark.sql.execution.datasources.parquet;
+import static org.apache.spark.sql.types.DataTypes.BinaryType;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+
import org.apache.parquet.bytes.ByteBufferInputStream;
-import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader;
+import org.apache.parquet.column.values.RequiresPreviousReader;
+import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
- * An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the
vectorized interface.
+ * An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the
vectorized
+ * interface.
*/
-public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase {
- private final DeltaByteArrayReader deltaByteArrayReader = new
DeltaByteArrayReader();
+public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase
+ implements VectorizedValuesReader, RequiresPreviousReader {
+
+ private final VectorizedDeltaBinaryPackedReader prefixLengthReader =
+ new VectorizedDeltaBinaryPackedReader();
+ private final VectorizedDeltaLengthByteArrayReader suffixReader;
+ private WritableColumnVector prefixLengthVector;
+ private WritableColumnVector suffixVector;
+ private byte[] previous = new byte[0];
+ private int currentRow = 0;
+
+ //temporary variable used by getBinary
+ private final WritableColumnVector binaryValVector;
+
+ VectorizedDeltaByteArrayReader() {
+ this.suffixReader = new VectorizedDeltaLengthByteArrayReader();
+ binaryValVector = new OnHeapColumnVector(1, BinaryType);
+ }
@Override
public void initFromPage(int valueCount, ByteBufferInputStream in) throws
IOException {
- deltaByteArrayReader.initFromPage(valueCount, in);
+ prefixLengthVector = new OnHeapColumnVector(valueCount, IntegerType);
+ suffixVector = new OnHeapColumnVector(valueCount, BinaryType);
+ prefixLengthReader.initFromPage(valueCount, in);
+ prefixLengthReader.readIntegers(prefixLengthReader.getTotalValueCount(),
+ prefixLengthVector, 0);
+ suffixReader.initFromPage(valueCount, in);
+ suffixReader.readBinary(valueCount, suffixVector, 0);
}
@Override
public Binary readBinary(int len) {
- return deltaByteArrayReader.readBytes();
+ readValues(1, binaryValVector, 0,
ByteBufferOutputWriter::writeArrayByteBuffer);
+ return Binary.fromConstantByteArray(binaryValVector.getBinary(0));
}
- @Override
- public void readBinary(int total, WritableColumnVector c, int rowId) {
+ public void readValues(int total, WritableColumnVector c, int rowId,
+ ByteBufferOutputWriter outputWriter) {
+ if (total == 0) {
+ return;
+ }
+
for (int i = 0; i < total; i++) {
- Binary binary = deltaByteArrayReader.readBytes();
- ByteBuffer buffer = binary.toByteBuffer();
- if (buffer.hasArray()) {
- c.putByteArray(rowId + i, buffer.array(), buffer.arrayOffset() +
buffer.position(),
- binary.length());
+ int prefixLength = prefixLengthVector.getInt(currentRow);
+ byte[] suffix = suffixVector.getBinary(currentRow);
+ // This does not copy bytes
+ int length = prefixLength + suffix.length;
+
+ // NOTE: due to PARQUET-246, it is important that we
+ // respect prefixLength which was read from prefixLengthReader,
+ // even for the *first* value of a page. Even though the first
+ // value of the page should have an empty prefix, it may not
+ // because of PARQUET-246.
+
+ // We have to do this to materialize the output
+ if (prefixLength != 0) {
+ // We could do
+ // c.putByteArray(rowId + i, previous, 0, prefixLength);
+ // c.putByteArray(rowId+i, suffix, prefixLength, suffix.length);
+ // previous = c.getBinary(rowId+1);
+ // but it incurs the same cost of copying the values twice _and_
c.getBinary
+ // is a _slow_ byte by byte copy
+ // The following always uses the faster system arraycopy method
+ byte[] out = new byte[length];
+ System.arraycopy(previous, 0, out, 0, prefixLength);
+ System.arraycopy(suffix, 0, out, prefixLength, suffix.length);
+ previous = out;
} else {
- byte[] bytes = new byte[binary.length()];
- buffer.get(bytes);
- c.putByteArray(rowId + i, bytes);
+ previous = suffix;
}
+ outputWriter.write(c, rowId + i, ByteBuffer.wrap(previous),
previous.length);
Review comment:
Is line106 always equivalent to ` c.putByteArray(rowId, previous, 0,
length)`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]