This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new dc2abe51 [SPARK-29918][SQL] RecordBinaryComparator should check endianness when compared by long dc2abe51 is described below commit dc2abe51ca2d3d702d6b6457301c3ca9c7244212 Author: wangguangxin.cn <wangguangxin...@bytedance.com> AuthorDate: Tue Nov 19 16:10:22 2019 +0800 [SPARK-29918][SQL] RecordBinaryComparator should check endianness when compared by long ### What changes were proposed in this pull request? This PR try to make sure the comparison results of `compared by 8 bytes at a time` and `compared by bytes wise` in RecordBinaryComparator is *consistent*, by reverse long bytes if it is little-endian and using Long.compareUnsigned. ### Why are the changes needed? If the architecture supports unaligned or the offset is 8 bytes aligned, `RecordBinaryComparator` compare 8 bytes at a time by reading 8 bytes as a long. Related code is ``` if (Platform.unaligned() || (((leftOff + i) % 8 == 0) && ((rightOff + i) % 8 == 0))) { while (i <= leftLen - 8) { final long v1 = Platform.getLong(leftObj, leftOff + i); final long v2 = Platform.getLong(rightObj, rightOff + i); if (v1 != v2) { return v1 > v2 ? 1 : -1; } i += 8; } } ``` Otherwise, it will compare bytes by bytes. Related code is ``` while (i < leftLen) { final int v1 = Platform.getByte(leftObj, leftOff + i) & 0xff; final int v2 = Platform.getByte(rightObj, rightOff + i) & 0xff; if (v1 != v2) { return v1 > v2 ? 1 : -1; } i += 1; } ``` However, on little-endian machine, the result of *compared by a long value* and *compared bytes by bytes* maybe different. For two same records, its offsets may vary in the first run and second run, which will lead to compare them using long comparison or byte-by-byte comparison, the result maybe different. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Add new test cases in RecordBinaryComparatorSuite Closes #26548 from WangGuangxin/binary_comparator. Authored-by: wangguangxin.cn <wangguangxin...@bytedance.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit ffc97530371433bc0221e06d8c1d11af8d92bd94) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/execution/RecordBinaryComparator.java | 30 +++++++++----- .../sort/RecordBinaryComparatorSuite.java | 47 +++++++++++++++++++++- 2 files changed, 67 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java index 40c2cc8..1f24340 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java @@ -20,8 +20,13 @@ package org.apache.spark.sql.execution; import org.apache.spark.unsafe.Platform; import org.apache.spark.util.collection.unsafe.sort.RecordComparator; +import java.nio.ByteOrder; + public final class RecordBinaryComparator extends RecordComparator { + private static final boolean LITTLE_ENDIAN = + ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN); + @Override public int compare( Object leftObj, long leftOff, int leftLen, Object rightObj, long rightOff, int rightLen) { @@ -38,10 +43,10 @@ public final class RecordBinaryComparator extends RecordComparator { // check if stars align and we can get both offsets to be aligned if ((leftOff % 8) == (rightOff % 8)) { while ((leftOff + i) % 8 != 0 && i < leftLen) { - final int v1 = Platform.getByte(leftObj, leftOff + i) & 0xff; - final int v2 = Platform.getByte(rightObj, rightOff + i) & 0xff; + final int v1 = Platform.getByte(leftObj, leftOff + i); + final int v2 = Platform.getByte(rightObj, rightOff + i); if (v1 != v2) { - return v1 > v2 ? 1 : -1; + return (v1 & 0xff) > (v2 & 0xff) ? 1 : -1; } i += 1; } @@ -49,10 +54,17 @@ public final class RecordBinaryComparator extends RecordComparator { // for architectures that support unaligned accesses, chew it up 8 bytes at a time if (Platform.unaligned() || (((leftOff + i) % 8 == 0) && ((rightOff + i) % 8 == 0))) { while (i <= leftLen - 8) { - final long v1 = Platform.getLong(leftObj, leftOff + i); - final long v2 = Platform.getLong(rightObj, rightOff + i); + long v1 = Platform.getLong(leftObj, leftOff + i); + long v2 = Platform.getLong(rightObj, rightOff + i); if (v1 != v2) { - return v1 > v2 ? 1 : -1; + if (LITTLE_ENDIAN) { + // if read as little-endian, we have to reverse bytes so that the long comparison result + // is equivalent to byte-by-byte comparison result. + // See discussion in https://github.com/apache/spark/pull/26548#issuecomment-554645859 + v1 = Long.reverseBytes(v1); + v2 = Long.reverseBytes(v2); + } + return Long.compareUnsigned(v1, v2); } i += 8; } @@ -60,10 +72,10 @@ public final class RecordBinaryComparator extends RecordComparator { // this will finish off the unaligned comparisons, or do the entire aligned comparison // whichever is needed. while (i < leftLen) { - final int v1 = Platform.getByte(leftObj, leftOff + i) & 0xff; - final int v2 = Platform.getByte(rightObj, rightOff + i) & 0xff; + final int v1 = Platform.getByte(leftObj, leftOff + i); + final int v2 = Platform.getByte(rightObj, rightOff + i); if (v1 != v2) { - return v1 > v2 ? 1 : -1; + return (v1 & 0xff) > (v2 & 0xff) ? 1 : -1; } i += 1; } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java index 97f3dc5..9664638e 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java @@ -32,6 +32,7 @@ import org.apache.spark.unsafe.types.UTF8String; import org.apache.spark.util.collection.unsafe.sort.*; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -271,7 +272,7 @@ public class RecordBinaryComparatorSuite { insertRow(row1); insertRow(row2); - assert(compare(0, 1) < 0); + assert(compare(0, 1) > 0); } @Test @@ -319,4 +320,48 @@ public class RecordBinaryComparatorSuite { assert(compare(0, 1) < 0); } + + @Test + public void testCompareLongsAsLittleEndian() { + long arrayOffset = 12; + + long[] arr1 = new long[2]; + Platform.putLong(arr1, arrayOffset, 0x0100000000000000L); + long[] arr2 = new long[2]; + Platform.putLong(arr2, arrayOffset + 4, 0x0000000000000001L); + // leftBaseOffset is not aligned while rightBaseOffset is aligned, + // it will start by comparing long + int result1 = binaryComparator.compare(arr1, arrayOffset, 8, arr2, arrayOffset + 4, 8); + + long[] arr3 = new long[2]; + Platform.putLong(arr3, arrayOffset, 0x0100000000000000L); + long[] arr4 = new long[2]; + Platform.putLong(arr4, arrayOffset, 0x0000000000000001L); + // both left and right offset is not aligned, it will start with byte-by-byte comparison + int result2 = binaryComparator.compare(arr3, arrayOffset, 8, arr4, arrayOffset, 8); + + Assert.assertEquals(result1, result2); + } + + @Test + public void testCompareLongsAsUnsigned() { + long arrayOffset = 12; + + long[] arr1 = new long[2]; + Platform.putLong(arr1, arrayOffset + 4, 0xa000000000000000L); + long[] arr2 = new long[2]; + Platform.putLong(arr2, arrayOffset + 4, 0x0000000000000000L); + // both leftBaseOffset and rightBaseOffset are aligned, so it will start by comparing long + int result1 = binaryComparator.compare(arr1, arrayOffset + 4, 8, arr2, arrayOffset + 4, 8); + + long[] arr3 = new long[2]; + Platform.putLong(arr3, arrayOffset, 0xa000000000000000L); + long[] arr4 = new long[2]; + Platform.putLong(arr4, arrayOffset, 0x0000000000000000L); + // both leftBaseOffset and rightBaseOffset are not aligned, + // so it will start with byte-by-byte comparison + int result2 = binaryComparator.compare(arr3, arrayOffset, 8, arr4, arrayOffset, 8); + + Assert.assertEquals(result1, result2); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org