LuciferYang commented on code in PR #55919:
URL: https://github.com/apache/spark/pull/55919#discussion_r3305340835
##########
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedReaderBase.java:
##########
@@ -27,6 +30,43 @@
*/
public class VectorizedReaderBase extends ValuesReader implements
VectorizedValuesReader {
+ /**
+ * Encodes an unsigned long as a minimal big-endian two's-complement byte
array
+ * compatible with {@link java.math.BigInteger} encoding. The result is
written into
+ * the backing array of {@code buf} (which must have capacity >= 9). Returns
the
+ * start offset; the valid bytes are {@code buf.array()[start .. 8]} (length
= 9 - start).
+ *
+ * <p>This avoids the per-value overhead of
+ * {@code new BigInteger(Long.toUnsignedString(v)).toByteArray()} which
allocates a
+ * String, a BigInteger, and a byte[] on every call.
+ */
+ static int encodeUnsignedLongBigEndian(long v, ByteBuffer buf) {
+ byte[] scratch = buf.array();
+ if (v == 0L) {
Review Comment:
When `v == 0L`, the method returns early with `start = 0` *before*
`buf.putLong(1, v)` is called. The caller writes `9 - start` bytes:
```java
int start = encodeUnsignedLongBigEndian(v, unsignedLongBuf);
c.putByteArray(rowId, unsignedLongBuf.array(), start, 9 - start); // 9 bytes
when start=0
``` But `scratch[1..8]` still contains data from the *previous* non-zero
call to this method (since `unsignedLongBuf` is reused across rows). This
produces a corrupt encoding: the downstream `BigInteger` constructor would
interpret the stale bytes as a non-zero value.
**Concrete example:** Given a column with values `[42L, 0L]`, the first call
encodes 42 into `scratch[1..8]`. The second call (`v = 0L`) sets `scratch[0] =
0` and returns `start = 0` without overwriting `scratch[1..8]`. The caller
writes 9 bytes `[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x2A]`, which
`BigInteger` interprets as 42, not 0.
**Suggested fix:** Move `buf.putLong(1, v)` before the zero-check so the
buffer is always current:
```java
buf.putLong(1, v);
if (v == 0L) {
scratch[0] = 0;
return 0;
}
```
##########
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java:
##########
@@ -215,6 +217,115 @@ public void skipLongs(int total) {
skipValues(total);
}
+ // ---- Bulk read helpers for readIntegers / readLongs
----------------------------
+ //
+ // The generic readValues() path dispatches a lambda per value. For the two
most
+ // common callers (readIntegers, readLongs) we can do much better: compute a
prefix
+ // sum over the unpacked deltas in-place, then bulk-copy the result into the
column
+ // vector with putInts / putLongs (backed by System.arraycopy on-heap).
+
+ /**
+ * Callback for writing a chunk of prefix-summed absolute values from
+ * {@code unpackedValuesBuffer} into a column vector. Called once per
mini-block
+ * (not per value), so lambda overhead is negligible.
+ */
+ @FunctionalInterface
+ private interface BulkWriter {
+ void write(WritableColumnVector c, int rowId, long[] values, int start,
int count);
+ }
+
+ /** Narrows long[] -> int[] scratch and bulk-writes via putInts. */
+ private void bulkWriteInts(WritableColumnVector c, int rowId,
+ long[] buf, int start, int count) {
+ if (intScratchBuffer == null) {
+ intScratchBuffer = new int[miniBlockSizeInValues];
+ }
+ for (int i = start; i < start + count; i++) {
+ intScratchBuffer[i] = (int) buf[i];
+ }
+ c.putInts(rowId, count, intScratchBuffer, start);
+ }
+
+ private void readBulkIntegers(int total, WritableColumnVector c, int rowId) {
+ checkReadBounds(total);
+ int remaining = total;
+ if (valuesRead == 0) {
+ c.putInt(rowId, (int) firstValue);
+ lastValueRead = firstValue;
+ rowId++;
+ remaining--;
+ }
+ remaining = readBulkLoop(remaining, c, rowId, this::bulkWriteInts);
+ valuesRead = total - remaining;
Review Comment:
`readBulkLoop` always returns `remaining = 0` (the while-loop exhausts it).
So `valuesRead = total - 0 = total`, overwriting the accumulated count from
prior calls. The bounds check `if (valuesRead + total > totalValueCount)`
under-counts the cumulative values read, so the guard fires too late and may
allow reads past the page boundary on multi-batch pages.
This is a pre-existing bug in the original `readValues` method (same file,
line 243 in master), carried forward unchanged into the new bulk paths.
**Suggested fix (applies to `readBulkIntegers`, `readBulkLongs`, and the
original `readValues`):**
```java
valuesRead += total;
```
##########
sql/core/benchmarks/VectorizedDeltaReaderBenchmark-results.txt:
##########
@@ -2,92 +2,92 @@
DELTA_BINARY_PACKED INT32
================================================================================================
-OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1011-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 17.0.19+10 on Linux 7.0.0-1004-azure
+AMD EPYC 9V45 96-Core Processor
Review Comment:
The baseline result files were captured on AMD EPYC 7763 (Zen 3); the new
results are from AMD EPYC 9V45 (Zen 5). This is a generational CPU upgrade with
significantly different IPC, cache hierarchy, and memory subsystem. The
reported speedup numbers (e.g. 3.8x for readLongs) include both the code
optimization AND the hardware improvement, making them unreliable for isolating
the PR's contribution.
Also, please update the results for Java 21 and Java 25 as well.
##########
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java:
##########
@@ -215,6 +217,115 @@ public void skipLongs(int total) {
skipValues(total);
}
+ // ---- Bulk read helpers for readIntegers / readLongs
----------------------------
+ //
+ // The generic readValues() path dispatches a lambda per value. For the two
most
+ // common callers (readIntegers, readLongs) we can do much better: compute a
prefix
+ // sum over the unpacked deltas in-place, then bulk-copy the result into the
column
+ // vector with putInts / putLongs (backed by System.arraycopy on-heap).
+
+ /**
+ * Callback for writing a chunk of prefix-summed absolute values from
+ * {@code unpackedValuesBuffer} into a column vector. Called once per
mini-block
+ * (not per value), so lambda overhead is negligible.
+ */
+ @FunctionalInterface
+ private interface BulkWriter {
+ void write(WritableColumnVector c, int rowId, long[] values, int start,
int count);
+ }
+
+ /** Narrows long[] -> int[] scratch and bulk-writes via putInts. */
+ private void bulkWriteInts(WritableColumnVector c, int rowId,
+ long[] buf, int start, int count) {
+ if (intScratchBuffer == null) {
+ intScratchBuffer = new int[miniBlockSizeInValues];
+ }
+ for (int i = start; i < start + count; i++) {
+ intScratchBuffer[i] = (int) buf[i];
+ }
+ c.putInts(rowId, count, intScratchBuffer, start);
+ }
+
+ private void readBulkIntegers(int total, WritableColumnVector c, int rowId) {
+ checkReadBounds(total);
+ int remaining = total;
+ if (valuesRead == 0) {
+ c.putInt(rowId, (int) firstValue);
+ lastValueRead = firstValue;
+ rowId++;
+ remaining--;
+ }
+ remaining = readBulkLoop(remaining, c, rowId, this::bulkWriteInts);
+ valuesRead = total - remaining;
+ }
+
+ private void readBulkLongs(int total, WritableColumnVector c, int rowId) {
+ checkReadBounds(total);
+ int remaining = total;
+ if (valuesRead == 0) {
+ c.putLong(rowId, firstValue);
+ lastValueRead = firstValue;
+ rowId++;
+ remaining--;
+ }
+ remaining = readBulkLoop(remaining, c, rowId,
+ (col, r, buf, s, n) -> col.putLongs(r, n, buf, s));
+ valuesRead = total - remaining;
+ }
+
+ private void checkReadBounds(int total) {
+ if (valuesRead + total > totalValueCount) {
+ throw new ParquetDecodingException(
+ "No more values to read. Total values read: " + valuesRead + ",
total count: "
+ + totalValueCount + ", trying to read " + total + " more.");
+ }
+ }
+
+ /** Mini-block iteration loop shared by readBulkIntegers and readBulkLongs.
*/
+ private int readBulkLoop(int remaining, WritableColumnVector c, int rowId,
+ BulkWriter writer) {
+ while (remaining > 0) {
Review Comment:
The loop runs until `remaining == 0`, so the return value is always 0. This
makes the return value redundant and obscures the `valuesRead` bug in the
caller (see item #2). Consider changing the return type to `void` to make the
exhaustive-loop contract explicit, and fix the callers to use `valuesRead +=
total` directly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]