anishshri-db commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1536560188
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -271,97 +277,97 @@ class RangeKeyScanStateEncoder(
// Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN
encoding
// using byte arrays.
+ // To handle "null" values, we prepend a byte to the byte array indicating
whether the value
+ // is null or not. If the value is null, we write the null byte followed by
a zero byte.
+ // If the value is not null, we write the null byte followed by the value.
+ // Note that setting null for the index on the unsafeRow is not feasible as
it would change
+ // the sorting order on iteration.
private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
val writer = new UnsafeRowWriter(numOrderingCols)
writer.resetRowWriter()
rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
val value = row.get(idx, field.dataType)
- field.dataType match {
- // endian-ness doesn't matter for single byte objects. so just write
these
- // types directly.
- case BooleanType => writer.write(idx, value.asInstanceOf[Boolean])
- case ByteType => writer.write(idx, value.asInstanceOf[Byte])
-
- // for other multi-byte types, we need to convert to big-endian
- case ShortType =>
- val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
- bbuf.order(ByteOrder.BIG_ENDIAN)
- bbuf.putShort(value.asInstanceOf[Short])
- writer.write(idx, bbuf.array())
-
- case IntegerType =>
- val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
- bbuf.order(ByteOrder.BIG_ENDIAN)
- bbuf.putInt(value.asInstanceOf[Int])
- writer.write(idx, bbuf.array())
-
- case LongType =>
- val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
- bbuf.order(ByteOrder.BIG_ENDIAN)
- bbuf.putLong(value.asInstanceOf[Long])
- writer.write(idx, bbuf.array())
-
- case FloatType =>
- val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
- bbuf.order(ByteOrder.BIG_ENDIAN)
- bbuf.putFloat(value.asInstanceOf[Float])
- writer.write(idx, bbuf.array())
-
- case DoubleType =>
- val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
- bbuf.order(ByteOrder.BIG_ENDIAN)
- bbuf.putDouble(value.asInstanceOf[Double])
- writer.write(idx, bbuf.array())
+ val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+ val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
+ bbuf.order(ByteOrder.BIG_ENDIAN)
+ bbuf.put(isNullCol)
+ if (isNullCol == 0x01.toByte) {
Review Comment:
@HeartSaVioR - that is what I am doing right. If you check line above, I am
always adding the `isNullCol` byte for each column.
only the policy is inverted
if value is null, set 0x01 to null byte and fill 0x00 for remaining bytes
(currently i am writing 0x00 for the next byte - but even that's not required
because byte buffer allocate will already initialize those bytes to null and
regardless we won't use them)
if the value is non-null, set 0x00 to null byte and fill remaining bytes to
actual values
line here -
https://github.com/apache/spark/pull/45503/files#diff-e7c05dcd29276ad2ad3469481491b0c30ea328d43dee37d7e2f33e65096e8f51R296
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]