anishshri-db commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1536561314
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -271,97 +277,97 @@ class RangeKeyScanStateEncoder(
// Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN
encoding
// using byte arrays.
+ // To handle "null" values, we prepend a byte to the byte array indicating
whether the value
+ // is null or not. If the value is null, we write the null byte followed by
a zero byte.
+ // If the value is not null, we write the null byte followed by the value.
+ // Note that setting null for the index on the unsafeRow is not feasible as
it would change
+ // the sorting order on iteration.
private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
val writer = new UnsafeRowWriter(numOrderingCols)
writer.resetRowWriter()
rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
val value = row.get(idx, field.dataType)
- field.dataType match {
- // endian-ness doesn't matter for single byte objects. so just write
these
- // types directly.
- case BooleanType => writer.write(idx, value.asInstanceOf[Boolean])
- case ByteType => writer.write(idx, value.asInstanceOf[Byte])
-
- // for other multi-byte types, we need to convert to big-endian
- case ShortType =>
- val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
- bbuf.order(ByteOrder.BIG_ENDIAN)
- bbuf.putShort(value.asInstanceOf[Short])
- writer.write(idx, bbuf.array())
-
- case IntegerType =>
- val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
- bbuf.order(ByteOrder.BIG_ENDIAN)
- bbuf.putInt(value.asInstanceOf[Int])
- writer.write(idx, bbuf.array())
-
- case LongType =>
- val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
- bbuf.order(ByteOrder.BIG_ENDIAN)
- bbuf.putLong(value.asInstanceOf[Long])
- writer.write(idx, bbuf.array())
-
- case FloatType =>
- val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
- bbuf.order(ByteOrder.BIG_ENDIAN)
- bbuf.putFloat(value.asInstanceOf[Float])
- writer.write(idx, bbuf.array())
-
- case DoubleType =>
- val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
- bbuf.order(ByteOrder.BIG_ENDIAN)
- bbuf.putDouble(value.asInstanceOf[Double])
- writer.write(idx, bbuf.array())
+ val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+ val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
+ bbuf.order(ByteOrder.BIG_ENDIAN)
+ bbuf.put(isNullCol)
+ if (isNullCol == 0x01.toByte) {
Review Comment:
Yup done
Here is the allocate docs -
```
Allocates a new byte buffer.
The new buffer's position will be zero, its limit will be its capacity, its
mark will be undefined, and each of its elements will be initialized to zero.
It will have a backing array and its array offset will be zero.
```
So I believe should be safe
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]