HeartSaVioR commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1536547158


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -271,97 +277,97 @@ class RangeKeyScanStateEncoder(
 
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN 
encoding
   // using byte arrays.
+  // To handle "null" values, we prepend a byte to the byte array indicating 
whether the value
+  // is null or not. If the value is null, we write the null byte followed by 
a zero byte.
+  // If the value is not null, we write the null byte followed by the value.
+  // Note that setting null for the index on the unsafeRow is not feasible as 
it would change
+  // the sorting order on iteration.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      field.dataType match {
-        // endian-ness doesn't matter for single byte objects. so just write 
these
-        // types directly.
-        case BooleanType => writer.write(idx, value.asInstanceOf[Boolean])
-        case ByteType => writer.write(idx, value.asInstanceOf[Byte])
-
-        // for other multi-byte types, we need to convert to big-endian
-        case ShortType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putShort(value.asInstanceOf[Short])
-          writer.write(idx, bbuf.array())
-
-        case IntegerType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putInt(value.asInstanceOf[Int])
-          writer.write(idx, bbuf.array())
-
-        case LongType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putLong(value.asInstanceOf[Long])
-          writer.write(idx, bbuf.array())
-
-        case FloatType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putFloat(value.asInstanceOf[Float])
-          writer.write(idx, bbuf.array())
-
-        case DoubleType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putDouble(value.asInstanceOf[Double])
-          writer.write(idx, bbuf.array())
+      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
+      bbuf.order(ByteOrder.BIG_ENDIAN)
+      bbuf.put(isNullCol)
+      if (isNullCol == 0x01.toByte) {

Review Comment:
   @neilramaswamy @sahnib Does the above proposal make sense to you as well? 
Just to make sure I'm not missing something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to