HeartSaVioR commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1536546941


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -271,97 +277,97 @@ class RangeKeyScanStateEncoder(
 
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN 
encoding
   // using byte arrays.
+  // To handle "null" values, we prepend a byte to the byte array indicating 
whether the value
+  // is null or not. If the value is null, we write the null byte followed by 
a zero byte.
+  // If the value is not null, we write the null byte followed by the value.
+  // Note that setting null for the index on the unsafeRow is not feasible as 
it would change
+  // the sorting order on iteration.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      field.dataType match {
-        // endian-ness doesn't matter for single byte objects. so just write 
these
-        // types directly.
-        case BooleanType => writer.write(idx, value.asInstanceOf[Boolean])
-        case ByteType => writer.write(idx, value.asInstanceOf[Byte])
-
-        // for other multi-byte types, we need to convert to big-endian
-        case ShortType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putShort(value.asInstanceOf[Short])
-          writer.write(idx, bbuf.array())
-
-        case IntegerType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putInt(value.asInstanceOf[Int])
-          writer.write(idx, bbuf.array())
-
-        case LongType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putLong(value.asInstanceOf[Long])
-          writer.write(idx, bbuf.array())
-
-        case FloatType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putFloat(value.asInstanceOf[Float])
-          writer.write(idx, bbuf.array())
-
-        case DoubleType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putDouble(value.asInstanceOf[Double])
-          writer.write(idx, bbuf.array())
+      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
+      bbuf.order(ByteOrder.BIG_ENDIAN)
+      bbuf.put(isNullCol)
+      if (isNullCol == 0x01.toByte) {

Review Comment:
   I'm less worried about overhead on adding a byte consistently per column, 
unless we will have 100s of columns on ordering.
   (I know it's 25% of overhead in numeric and 100% of overhead for boolean, 
which sounds like significant. But as we are here to support null case, 
probably better not to support partially but do full support.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to