sahnib commented on code in PR #45778:
URL: https://github.com/apache/spark/pull/45778#discussion_r1547188291


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -276,53 +283,113 @@ class RangeKeyScanStateEncoder(
     rangeScanKeyProjection(key)
   }
 
+  // bit masks used for checking sign or flipping all bits for negative 
float/double values
+  private val floatFlipBitMask = 0xFFFFFFFF
+  private val floatSignBitMask = 0x80000000
+
+  private val doubleFlipBitMask = 0xFFFFFFFFFFFFFFFFL
+  private val doubleSignBitMask = 0x8000000000000000L
+
+  // Byte markers used to identify whether the value is null, negative or 
positive
+  // To ensure sorted ordering, we use the lowest byte value for negative 
numbers followed by
+  // positive numbers and then null values.
+  private val negativeValMarker: Byte = 0x00.toByte
+  private val positiveValMarker: Byte = 0x01.toByte
+  private val nullValMarker: Byte = 0x02.toByte
+
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN 
encoding
   // using byte arrays.
   // To handle "null" values, we prepend a byte to the byte array indicating 
whether the value
-  // is null or not. If the value is null, we write the null byte followed by 
a zero byte.
+  // is null or not. If the value is null, we write the null byte followed by 
zero bytes.
   // If the value is not null, we write the null byte followed by the value.
   // Note that setting null for the index on the unsafeRow is not feasible as 
it would change
   // the sorting order on iteration.
+  // Also note that the same byte is used to indicate whether the value is 
negative or not.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      // initialize the value to indicate positive value to begin with
+      var isNullOrSignCol: Byte = positiveValMarker
+      // Update the isNullOrSignCol byte (if required) to indicate null value
+      if (value == null) {
+        isNullOrSignCol = nullValMarker
+      }
       // Note that we cannot allocate a smaller buffer here even if the value 
is null
       // because the effective byte array is considered variable size and 
needs to have
       // the same size across all rows for the ordering to work as expected.
       val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
       bbuf.order(ByteOrder.BIG_ENDIAN)
-      bbuf.put(isNullCol)
-      if (isNullCol == 0x01.toByte) {
+      if (isNullOrSignCol == nullValMarker) {
+        bbuf.put(isNullOrSignCol)
         writer.write(idx, bbuf.array())
       } else {
         field.dataType match {
           case BooleanType =>
           case ByteType =>
+            bbuf.put(isNullOrSignCol)
             bbuf.put(value.asInstanceOf[Byte])
             writer.write(idx, bbuf.array())
 
-          // for other multi-byte types, we need to convert to big-endian
           case ShortType =>
+            if (value.asInstanceOf[Short] < 0) {
+              isNullOrSignCol = negativeValMarker
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putShort(value.asInstanceOf[Short])
             writer.write(idx, bbuf.array())

Review Comment:
   [nit] we could extract the Short/Integer/Long handling to a common typed 
method. 
   
   ```
   def encodeIntegralValue[T](value: T, bbuf: ByteBuffer): Unit = {
     if (value.asInstanceOf[T] < 0) {
       isNullOrSignCol = negativeValMarker
     }
     bbuf.put(isNullOrSignCol)
     bbuf.putShort(value.asInstanceOf[T])
     writer.write(idx, bbuf.array())
   }
   ```
   
   same for Float/Double. 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -206,19 +208,24 @@ class PrefixKeyScanStateEncoder(
  * for the range scan into an UnsafeRow; we then rewrite that UnsafeRow's 
fields in BIG_ENDIAN
  * to allow for scanning keys in sorted order using the byte-wise comparison 
method that
  * RocksDB uses.
+ *
  * Then, for the rest of the fields, we project those into another UnsafeRow.
  * We then effectively join these two UnsafeRows together, and finally take 
those bytes
  * to get the resulting row.
+ *
  * We cannot support variable sized fields given the UnsafeRow format which 
stores variable
  * sized fields as offset and length pointers to the actual values, thereby 
changing the required
  * ordering.
+ *
  * Note that we also support "null" values being passed for these fixed size 
fields. We prepend
  * a single byte to indicate whether the column value is null or not. We 
cannot change the
  * nullability on the UnsafeRow itself as the expected ordering would change 
if non-first
  * columns are marked as null. If the first col is null, those entries will 
appear last in
  * the iterator. If non-first columns are null, ordering based on the previous 
columns will
  * still be honored. For rows with null column values, ordering for subsequent 
columns
- * will also be maintained within those set of rows.
+ * will also be maintained within those set of rows. We use the same byte to 
also encode whether
+ * the value is negative or not. For negative float/double values, we flip all 
the bits to ensure
+ * the right lexicographical ordering.

Review Comment:
   [nit] I know its not part of this PR, but can we also change the parameter 
name `numColsPrefixKey` in `RangeKeyScanStateEncoderSpec` to `numOrderingCols` 
as used here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to