Github user henryr commented on a diff in the pull request:
https://github.com/apache/spark/pull/21070#discussion_r181540952
--- Diff:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
---
@@ -63,115 +58,139 @@ public final void readBooleans(int total,
WritableColumnVector c, int rowId) {
}
}
+ private ByteBuffer getBuffer(int length) {
+ try {
+ return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read " + length + "
bytes", e);
+ }
+ }
+
@Override
public final void readIntegers(int total, WritableColumnVector c, int
rowId) {
- c.putIntsLittleEndian(rowId, total, buffer, offset -
Platform.BYTE_ARRAY_OFFSET);
- offset += 4 * total;
+ int requiredBytes = total * 4;
+ ByteBuffer buffer = getBuffer(requiredBytes);
+
+ for (int i = 0; i < total; i += 1) {
--- End diff --
Couldn't it also be writing to an `OffHeapColumnVector`?
https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java#L199
If so, I think the copy is 1MB at a time:
https://github.com/apache/spark/blob/master/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java#L189
I agree that ByteBuffer shouldn't be supported in this PR. But there's an
opportunity to use the bulk copy APIs which would benefit from any future
optimization that happens. Plus even if the copy does eventually become a loop
inside the column vector implementation, there's more chance of the JIT
unrolling the loop since it's smaller.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]