neilramaswamy commented on code in PR #45778:
URL: https://github.com/apache/spark/pull/45778#discussion_r1548751462


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -276,53 +283,113 @@ class RangeKeyScanStateEncoder(
     rangeScanKeyProjection(key)
   }
 
+  // bit masks used for checking sign or flipping all bits for negative 
float/double values
+  private val floatFlipBitMask = 0xFFFFFFFF
+  private val floatSignBitMask = 0x80000000
+
+  private val doubleFlipBitMask = 0xFFFFFFFFFFFFFFFFL
+  private val doubleSignBitMask = 0x8000000000000000L
+
+  // Byte markers used to identify whether the value is null, negative or 
positive
+  // To ensure sorted ordering, we use the lowest byte value for negative 
numbers followed by
+  // positive numbers and then null values.
+  private val negativeValMarker: Byte = 0x00.toByte
+  private val positiveValMarker: Byte = 0x01.toByte
+  private val nullValMarker: Byte = 0x02.toByte
+
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN 
encoding
   // using byte arrays.
   // To handle "null" values, we prepend a byte to the byte array indicating 
whether the value
-  // is null or not. If the value is null, we write the null byte followed by 
a zero byte.
+  // is null or not. If the value is null, we write the null byte followed by 
zero bytes.
   // If the value is not null, we write the null byte followed by the value.
   // Note that setting null for the index on the unsafeRow is not feasible as 
it would change
   // the sorting order on iteration.
+  // Also note that the same byte is used to indicate whether the value is 
negative or not.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      // initialize the value to indicate positive value to begin with
+      var isNullOrSignCol: Byte = positiveValMarker
+      // Update the isNullOrSignCol byte (if required) to indicate null value
+      if (value == null) {
+        isNullOrSignCol = nullValMarker
+      }
       // Note that we cannot allocate a smaller buffer here even if the value 
is null
       // because the effective byte array is considered variable size and 
needs to have
       // the same size across all rows for the ordering to work as expected.
       val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
       bbuf.order(ByteOrder.BIG_ENDIAN)
-      bbuf.put(isNullCol)
-      if (isNullCol == 0x01.toByte) {
+      if (isNullOrSignCol == nullValMarker) {
+        bbuf.put(isNullOrSignCol)
         writer.write(idx, bbuf.array())
       } else {
         field.dataType match {
           case BooleanType =>
           case ByteType =>
+            bbuf.put(isNullOrSignCol)
             bbuf.put(value.asInstanceOf[Byte])
             writer.write(idx, bbuf.array())
 
-          // for other multi-byte types, we need to convert to big-endian
           case ShortType =>
+            if (value.asInstanceOf[Short] < 0) {
+              isNullOrSignCol = negativeValMarker
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putShort(value.asInstanceOf[Short])
             writer.write(idx, bbuf.array())
 
           case IntegerType =>
+            if (value.asInstanceOf[Int] < 0) {
+              isNullOrSignCol = negativeValMarker
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putInt(value.asInstanceOf[Int])
             writer.write(idx, bbuf.array())
 
           case LongType =>
+            if (value.asInstanceOf[Long] < 0) {
+              isNullOrSignCol = negativeValMarker
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putLong(value.asInstanceOf[Long])
             writer.write(idx, bbuf.array())
 
           case FloatType =>
-            bbuf.putFloat(value.asInstanceOf[Float])
+            // for negative values, we need to flip all the bits to ensure 
correct ordering
+            val rawBits = floatToRawIntBits(value.asInstanceOf[Float])
+            // perform sign comparison using bit manipulation to ensure NaN 
values are handled
+            // correctly
+            if ((rawBits & floatSignBitMask) != 0) {
+              // flip all the bits

Review Comment:
   Hm, feels like we're brushing aside the complexity here. We ought to explain 
_why_ flipping the bits works (it's not obvious). Here's why I think this works:
   
   IEEE 754 has the following format: `[sign bit, exponent, mantissa]`. Let's 
say that the sign bit is `1`, so we have a negative number. When the exponent 
is lexicographically larger, then we have a more negative number (same with 
mantissa). We want the opposite to be true, i.e. when the exponent/mantissa is 
lexicographically larger, we have a smaller number.
   
   How can we do that? Well, we can shift the negative numbers into the 
positive range. We can shift up by adding a constant, `2^n - 1` (which is 
flipping the bits). Then, if `x` and `y` are negative s.t. `|x| > |y|`, then `x 
+ 2^n - 1 < y + 2^n - 1`.
   
   Not the most elegant explanation (I'm sure there's better), but at least 
it's not evading complexity.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -294,6 +295,55 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  testWithColumnFamilies("rocksdb range scan - variable size non-ordering 
columns with " +
+    "double type values are supported",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    val testSchema: StructType = StructType(
+      Seq(StructField("key1", DoubleType, false),
+        StructField("key2", StringType, false)))
+
+    val schemaProj = UnsafeProjection.create(Array[DataType](DoubleType, 
StringType))
+    tryWithProviderResource(newStoreProvider(testSchema,
+      RangeKeyScanStateEncoderSpec(testSchema, 1), colFamiliesEnabled)) { 
provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          testSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(testSchema, 1))
+      }
+
+      // Verify that the sort ordering here is as follows:
+      // -NaN, -Infinity, -ve values, 0, +ve values, +Infinity, +NaN
+      val timerTimestamps: Seq[Double] = Seq(6894.32, 345.2795, -23.24, 24.466,
+        7860.0, 4535.55, 423.42, -5350.355, 0.0, 0.001, 0.233, -53.255, 
-66.356, -244.452,
+        96456466.3536677, 14421434453.43524562, Double.NaN, 
Double.PositiveInfinity,

Review Comment:
   There are many `NaN`s as per IEEE 754—is there only two valid/possible NaNs 
(+/-) in Java ?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -294,6 +295,55 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  testWithColumnFamilies("rocksdb range scan - variable size non-ordering 
columns with " +
+    "double type values are supported",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    val testSchema: StructType = StructType(
+      Seq(StructField("key1", DoubleType, false),
+        StructField("key2", StringType, false)))
+
+    val schemaProj = UnsafeProjection.create(Array[DataType](DoubleType, 
StringType))
+    tryWithProviderResource(newStoreProvider(testSchema,
+      RangeKeyScanStateEncoderSpec(testSchema, 1), colFamiliesEnabled)) { 
provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          testSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(testSchema, 1))
+      }
+
+      // Verify that the sort ordering here is as follows:
+      // -NaN, -Infinity, -ve values, 0, +ve values, +Infinity, +NaN
+      val timerTimestamps: Seq[Double] = Seq(6894.32, 345.2795, -23.24, 24.466,
+        7860.0, 4535.55, 423.42, -5350.355, 0.0, 0.001, 0.233, -53.255, 
-66.356, -244.452,

Review Comment:
   +/- 0.0?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -276,53 +283,113 @@ class RangeKeyScanStateEncoder(
     rangeScanKeyProjection(key)
   }
 
+  // bit masks used for checking sign or flipping all bits for negative 
float/double values
+  private val floatFlipBitMask = 0xFFFFFFFF
+  private val floatSignBitMask = 0x80000000
+
+  private val doubleFlipBitMask = 0xFFFFFFFFFFFFFFFFL
+  private val doubleSignBitMask = 0x8000000000000000L
+
+  // Byte markers used to identify whether the value is null, negative or 
positive
+  // To ensure sorted ordering, we use the lowest byte value for negative 
numbers followed by
+  // positive numbers and then null values.
+  private val negativeValMarker: Byte = 0x00.toByte
+  private val positiveValMarker: Byte = 0x01.toByte
+  private val nullValMarker: Byte = 0x02.toByte
+
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN 
encoding
   // using byte arrays.
   // To handle "null" values, we prepend a byte to the byte array indicating 
whether the value
-  // is null or not. If the value is null, we write the null byte followed by 
a zero byte.
+  // is null or not. If the value is null, we write the null byte followed by 
zero bytes.
   // If the value is not null, we write the null byte followed by the value.
   // Note that setting null for the index on the unsafeRow is not feasible as 
it would change
   // the sorting order on iteration.
+  // Also note that the same byte is used to indicate whether the value is 
negative or not.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      // initialize the value to indicate positive value to begin with
+      var isNullOrSignCol: Byte = positiveValMarker
+      // Update the isNullOrSignCol byte (if required) to indicate null value
+      if (value == null) {
+        isNullOrSignCol = nullValMarker
+      }
       // Note that we cannot allocate a smaller buffer here even if the value 
is null
       // because the effective byte array is considered variable size and 
needs to have
       // the same size across all rows for the ordering to work as expected.
       val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
       bbuf.order(ByteOrder.BIG_ENDIAN)
-      bbuf.put(isNullCol)
-      if (isNullCol == 0x01.toByte) {
+      if (isNullOrSignCol == nullValMarker) {
+        bbuf.put(isNullOrSignCol)
         writer.write(idx, bbuf.array())
       } else {
         field.dataType match {
           case BooleanType =>
           case ByteType =>
+            bbuf.put(isNullOrSignCol)
             bbuf.put(value.asInstanceOf[Byte])
             writer.write(idx, bbuf.array())
 
-          // for other multi-byte types, we need to convert to big-endian
           case ShortType =>
+            if (value.asInstanceOf[Short] < 0) {
+              isNullOrSignCol = negativeValMarker
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putShort(value.asInstanceOf[Short])
             writer.write(idx, bbuf.array())
 
           case IntegerType =>
+            if (value.asInstanceOf[Int] < 0) {
+              isNullOrSignCol = negativeValMarker
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putInt(value.asInstanceOf[Int])
             writer.write(idx, bbuf.array())
 
           case LongType =>
+            if (value.asInstanceOf[Long] < 0) {
+              isNullOrSignCol = negativeValMarker
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putLong(value.asInstanceOf[Long])
             writer.write(idx, bbuf.array())
 
           case FloatType =>
-            bbuf.putFloat(value.asInstanceOf[Float])
+            // for negative values, we need to flip all the bits to ensure 
correct ordering
+            val rawBits = floatToRawIntBits(value.asInstanceOf[Float])
+            // perform sign comparison using bit manipulation to ensure NaN 
values are handled
+            // correctly
+            if ((rawBits & floatSignBitMask) != 0) {
+              // flip all the bits

Review Comment:
   Or, maybe Wikipedia notes it somewhere :)



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -276,53 +283,113 @@ class RangeKeyScanStateEncoder(
     rangeScanKeyProjection(key)
   }
 
+  // bit masks used for checking sign or flipping all bits for negative 
float/double values
+  private val floatFlipBitMask = 0xFFFFFFFF
+  private val floatSignBitMask = 0x80000000
+
+  private val doubleFlipBitMask = 0xFFFFFFFFFFFFFFFFL
+  private val doubleSignBitMask = 0x8000000000000000L
+
+  // Byte markers used to identify whether the value is null, negative or 
positive
+  // To ensure sorted ordering, we use the lowest byte value for negative 
numbers followed by
+  // positive numbers and then null values.
+  private val negativeValMarker: Byte = 0x00.toByte
+  private val positiveValMarker: Byte = 0x01.toByte
+  private val nullValMarker: Byte = 0x02.toByte
+
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN 
encoding
   // using byte arrays.
   // To handle "null" values, we prepend a byte to the byte array indicating 
whether the value
-  // is null or not. If the value is null, we write the null byte followed by 
a zero byte.
+  // is null or not. If the value is null, we write the null byte followed by 
zero bytes.
   // If the value is not null, we write the null byte followed by the value.
   // Note that setting null for the index on the unsafeRow is not feasible as 
it would change
   // the sorting order on iteration.
+  // Also note that the same byte is used to indicate whether the value is 
negative or not.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      // initialize the value to indicate positive value to begin with
+      var isNullOrSignCol: Byte = positiveValMarker

Review Comment:
   I think this code with the marker is a bit convoluted, what about something 
like:
   
   ```
   // For each field
   bbuf = allocate()
   if null:
       bbuf.put(nullMarker)
   else:
       // Switch on case
       val marker = positiveMarker if val >= 0 else negativeMarker
       bbuf.put(marker)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to