neilramaswamy commented on code in PR #45778:
URL: https://github.com/apache/spark/pull/45778#discussion_r1546662169


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -283,45 +286,79 @@ class RangeKeyScanStateEncoder(
   // If the value is not null, we write the null byte followed by the value.
   // Note that setting null for the index on the unsafeRow is not feasible as 
it would change
   // the sorting order on iteration.
+  // Also note that the same byte is used to indicate whether the value is 
negative or not.

Review Comment:
   More importantly, I think we need to comment why we chose the byte values 
the way we did. We can't encode positive numbers with `0x00`, for example. It 
needs to be `0x01`.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -283,45 +286,79 @@ class RangeKeyScanStateEncoder(
   // If the value is not null, we write the null byte followed by the value.
   // Note that setting null for the index on the unsafeRow is not feasible as 
it would change
   // the sorting order on iteration.
+  // Also note that the same byte is used to indicate whether the value is 
negative or not.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      // initialize to 0x01 to indicate that the column is not null and 
positive
+      var isNullOrSignCol: Byte = 0x01.toByte

Review Comment:
   Can we refactor these `0x01` `0x00` to some named constants? I know what's 
was going on here, but the first read was high friction.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -283,45 +286,79 @@ class RangeKeyScanStateEncoder(
   // If the value is not null, we write the null byte followed by the value.
   // Note that setting null for the index on the unsafeRow is not feasible as 
it would change
   // the sorting order on iteration.
+  // Also note that the same byte is used to indicate whether the value is 
negative or not.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      // initialize to 0x01 to indicate that the column is not null and 
positive
+      var isNullOrSignCol: Byte = 0x01.toByte
+      // Update the isNullOrSignCol byte to indicate null value
+      if (value == null) {
+        isNullOrSignCol = 0x02.toByte
+      }
       // Note that we cannot allocate a smaller buffer here even if the value 
is null
       // because the effective byte array is considered variable size and 
needs to have
       // the same size across all rows for the ordering to work as expected.
       val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
       bbuf.order(ByteOrder.BIG_ENDIAN)
-      bbuf.put(isNullCol)
-      if (isNullCol == 0x01.toByte) {
+      if (isNullOrSignCol == 0x02.toByte) {
+        bbuf.put(isNullOrSignCol)
         writer.write(idx, bbuf.array())
       } else {
         field.dataType match {
           case BooleanType =>
           case ByteType =>
+            bbuf.put(isNullOrSignCol)
             bbuf.put(value.asInstanceOf[Byte])
             writer.write(idx, bbuf.array())
 
-          // for other multi-byte types, we need to convert to big-endian
           case ShortType =>
+            if (value.asInstanceOf[Short] < 0) {
+              isNullOrSignCol = 0x00.toByte
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putShort(value.asInstanceOf[Short])
             writer.write(idx, bbuf.array())
 
           case IntegerType =>
+            if (value.asInstanceOf[Int] < 0) {
+              isNullOrSignCol = 0x00.toByte
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putInt(value.asInstanceOf[Int])
             writer.write(idx, bbuf.array())
 
           case LongType =>
+            if (value.asInstanceOf[Long] < 0) {
+              isNullOrSignCol = 0x00.toByte
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putLong(value.asInstanceOf[Long])
             writer.write(idx, bbuf.array())
 
+          // For floating point types, we cannot support ordering using 
additional byte for
+          // negative values. This is because the IEEE 754 floating point 
representation
+          // stores exponent first and then the mantissa. So, we cannot simply 
prepend a byte

Review Comment:
   I'm not sure that the relative position of the exponent and mantissa is the 
issue here ? IIUC the reason is that (assuming the sign bit is `1`) as the 
exponent gets larger, the number becomes more negative. Which means that a 
lexicographically smaller exponent means that the number is closer to 0, i.e. 
should appear _later_.
   
   In two's complement, I think the trick of flipping the sign bit works since 
the _rest_ of the bits being lexicographically small mean that the number 
itself is actually smaller. That is, `0b1010` (`-6`) is larger than `0b1001` 
(`-7`) and indeed, if we disregard the MSB, `010` is lexicographically larger 
than `001`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to