HeartSaVioR commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1536546801


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -271,97 +277,97 @@ class RangeKeyScanStateEncoder(
 
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN 
encoding
   // using byte arrays.
+  // To handle "null" values, we prepend a byte to the byte array indicating 
whether the value
+  // is null or not. If the value is null, we write the null byte followed by 
a zero byte.
+  // If the value is not null, we write the null byte followed by the value.
+  // Note that setting null for the index on the unsafeRow is not feasible as 
it would change
+  // the sorting order on iteration.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      field.dataType match {
-        // endian-ness doesn't matter for single byte objects. so just write 
these
-        // types directly.
-        case BooleanType => writer.write(idx, value.asInstanceOf[Boolean])
-        case ByteType => writer.write(idx, value.asInstanceOf[Byte])
-
-        // for other multi-byte types, we need to convert to big-endian
-        case ShortType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putShort(value.asInstanceOf[Short])
-          writer.write(idx, bbuf.array())
-
-        case IntegerType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putInt(value.asInstanceOf[Int])
-          writer.write(idx, bbuf.array())
-
-        case LongType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putLong(value.asInstanceOf[Long])
-          writer.write(idx, bbuf.array())
-
-        case FloatType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putFloat(value.asInstanceOf[Float])
-          writer.write(idx, bbuf.array())
-
-        case DoubleType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putDouble(value.asInstanceOf[Double])
-          writer.write(idx, bbuf.array())
+      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
+      bbuf.order(ByteOrder.BIG_ENDIAN)
+      bbuf.put(isNullCol)
+      if (isNullCol == 0x01.toByte) {

Review Comment:
   @anishshri-db 
   Maybe I wasn't very clear. What I meant was "consistently" prepending a 
single byte for nullability, regardless whether they are null or not. 
   
   If the value is null, set 0x00 to the null byte and fill 0x00 for remaining 
bytes.
   If the value is non-null, set 0x01 to the null byte and fill actual value 
for remaining bytes.
   
   In any case, `[null byte][actual bytes]` per range column.
   
   With this, you can keep the ordering even though there is null value in the 
middle of column. It's just that we decide in prior whether the ordering is 
"null first" vs "null last" because the decision is stored "physically".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to