iemejia commented on code in PR #55919:
URL: https://github.com/apache/spark/pull/55919#discussion_r3306624804
##########
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedReaderBase.java:
##########
@@ -27,6 +30,43 @@
*/
public class VectorizedReaderBase extends ValuesReader implements
VectorizedValuesReader {
+ /**
+ * Encodes an unsigned long as a minimal big-endian two's-complement byte
array
+ * compatible with {@link java.math.BigInteger} encoding. The result is
written into
+ * the backing array of {@code buf} (which must have capacity >= 9). Returns
the
+ * start offset; the valid bytes are {@code buf.array()[start .. 8]} (length
= 9 - start).
+ *
+ * <p>This avoids the per-value overhead of
+ * {@code new BigInteger(Long.toUnsignedString(v)).toByteArray()} which
allocates a
+ * String, a BigInteger, and a byte[] on every call.
+ */
+ static int encodeUnsignedLongBigEndian(long v, ByteBuffer buf) {
+ byte[] scratch = buf.array();
+ if (v == 0L) {
Review Comment:
Good catch. Fixed — moved `buf.putLong(1, v)` before the zero-check so the
buffer is always current when reused across rows.
##########
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java:
##########
@@ -215,6 +217,115 @@ public void skipLongs(int total) {
skipValues(total);
}
+ // ---- Bulk read helpers for readIntegers / readLongs
----------------------------
+ //
+ // The generic readValues() path dispatches a lambda per value. For the two
most
+ // common callers (readIntegers, readLongs) we can do much better: compute a
prefix
+ // sum over the unpacked deltas in-place, then bulk-copy the result into the
column
+ // vector with putInts / putLongs (backed by System.arraycopy on-heap).
+
+ /**
+ * Callback for writing a chunk of prefix-summed absolute values from
+ * {@code unpackedValuesBuffer} into a column vector. Called once per
mini-block
+ * (not per value), so lambda overhead is negligible.
+ */
+ @FunctionalInterface
+ private interface BulkWriter {
+ void write(WritableColumnVector c, int rowId, long[] values, int start,
int count);
+ }
+
+ /** Narrows long[] -> int[] scratch and bulk-writes via putInts. */
+ private void bulkWriteInts(WritableColumnVector c, int rowId,
+ long[] buf, int start, int count) {
+ if (intScratchBuffer == null) {
+ intScratchBuffer = new int[miniBlockSizeInValues];
+ }
+ for (int i = start; i < start + count; i++) {
+ intScratchBuffer[i] = (int) buf[i];
+ }
+ c.putInts(rowId, count, intScratchBuffer, start);
+ }
+
+ private void readBulkIntegers(int total, WritableColumnVector c, int rowId) {
+ checkReadBounds(total);
+ int remaining = total;
+ if (valuesRead == 0) {
+ c.putInt(rowId, (int) firstValue);
+ lastValueRead = firstValue;
+ rowId++;
+ remaining--;
+ }
+ remaining = readBulkLoop(remaining, c, rowId, this::bulkWriteInts);
+ valuesRead = total - remaining;
Review Comment:
Fixed in `readBulkIntegers`, `readBulkLongs`, and also in the original
`readValues` (the pre-existing bug you mentioned). All three now use
`valuesRead += total`.
##########
sql/core/benchmarks/VectorizedDeltaReaderBenchmark-results.txt:
##########
@@ -2,92 +2,92 @@
DELTA_BINARY_PACKED INT32
================================================================================================
-OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1011-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 17.0.19+10 on Linux 7.0.0-1004-azure
+AMD EPYC 9V45 96-Core Processor
Review Comment:
You're right. I've reverted the results file to the upstream baseline. I
expect the CI workflow to regenerate it on consistent hardware with Java
17/21/25.
##########
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java:
##########
@@ -215,6 +217,115 @@ public void skipLongs(int total) {
skipValues(total);
}
+ // ---- Bulk read helpers for readIntegers / readLongs
----------------------------
+ //
+ // The generic readValues() path dispatches a lambda per value. For the two
most
+ // common callers (readIntegers, readLongs) we can do much better: compute a
prefix
+ // sum over the unpacked deltas in-place, then bulk-copy the result into the
column
+ // vector with putInts / putLongs (backed by System.arraycopy on-heap).
+
+ /**
+ * Callback for writing a chunk of prefix-summed absolute values from
+ * {@code unpackedValuesBuffer} into a column vector. Called once per
mini-block
+ * (not per value), so lambda overhead is negligible.
+ */
+ @FunctionalInterface
+ private interface BulkWriter {
+ void write(WritableColumnVector c, int rowId, long[] values, int start,
int count);
+ }
+
+ /** Narrows long[] -> int[] scratch and bulk-writes via putInts. */
+ private void bulkWriteInts(WritableColumnVector c, int rowId,
+ long[] buf, int start, int count) {
+ if (intScratchBuffer == null) {
+ intScratchBuffer = new int[miniBlockSizeInValues];
+ }
+ for (int i = start; i < start + count; i++) {
+ intScratchBuffer[i] = (int) buf[i];
+ }
+ c.putInts(rowId, count, intScratchBuffer, start);
+ }
+
+ private void readBulkIntegers(int total, WritableColumnVector c, int rowId) {
+ checkReadBounds(total);
+ int remaining = total;
+ if (valuesRead == 0) {
+ c.putInt(rowId, (int) firstValue);
+ lastValueRead = firstValue;
+ rowId++;
+ remaining--;
+ }
+ remaining = readBulkLoop(remaining, c, rowId, this::bulkWriteInts);
+ valuesRead = total - remaining;
+ }
+
+ private void readBulkLongs(int total, WritableColumnVector c, int rowId) {
+ checkReadBounds(total);
+ int remaining = total;
+ if (valuesRead == 0) {
+ c.putLong(rowId, firstValue);
+ lastValueRead = firstValue;
+ rowId++;
+ remaining--;
+ }
+ remaining = readBulkLoop(remaining, c, rowId,
+ (col, r, buf, s, n) -> col.putLongs(r, n, buf, s));
+ valuesRead = total - remaining;
+ }
+
+ private void checkReadBounds(int total) {
+ if (valuesRead + total > totalValueCount) {
+ throw new ParquetDecodingException(
+ "No more values to read. Total values read: " + valuesRead + ",
total count: "
+ + totalValueCount + ", trying to read " + total + " more.");
+ }
+ }
+
+ /** Mini-block iteration loop shared by readBulkIntegers and readBulkLongs.
*/
+ private int readBulkLoop(int remaining, WritableColumnVector c, int rowId,
+ BulkWriter writer) {
+ while (remaining > 0) {
Review Comment:
Done. Changed `readBulkLoop` to `void` and removed the dead `return
remaining` since the loop always exhausts it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]