HeartSaVioR commented on code in PR #45778:
URL: https://github.com/apache/spark/pull/45778#discussion_r1548877778
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -276,53 +284,111 @@ class RangeKeyScanStateEncoder(
rangeScanKeyProjection(key)
}
+ // bit masks used for checking sign or flipping all bits for negative
float/double values
+ private val floatFlipBitMask = 0xFFFFFFFF
+ private val floatSignBitMask = 0x80000000
+
+ private val doubleFlipBitMask = 0xFFFFFFFFFFFFFFFFL
+ private val doubleSignBitMask = 0x8000000000000000L
+
+ // Byte markers used to identify whether the value is null, negative or
positive
+ // To ensure sorted ordering, we use the lowest byte value for negative
numbers followed by
+ // positive numbers and then null values.
+ private val negativeValMarker: Byte = 0x00.toByte
+ private val positiveValMarker: Byte = 0x01.toByte
+ private val nullValMarker: Byte = 0x02.toByte
+
// Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN
encoding
// using byte arrays.
// To handle "null" values, we prepend a byte to the byte array indicating
whether the value
- // is null or not. If the value is null, we write the null byte followed by
a zero byte.
+ // is null or not. If the value is null, we write the null byte followed by
zero bytes.
// If the value is not null, we write the null byte followed by the value.
// Note that setting null for the index on the unsafeRow is not feasible as
it would change
// the sorting order on iteration.
+ // Also note that the same byte is used to indicate whether the value is
negative or not.
private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
val writer = new UnsafeRowWriter(numOrderingCols)
writer.resetRowWriter()
rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
val value = row.get(idx, field.dataType)
- val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
// Note that we cannot allocate a smaller buffer here even if the value
is null
// because the effective byte array is considered variable size and
needs to have
// the same size across all rows for the ordering to work as expected.
val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
bbuf.order(ByteOrder.BIG_ENDIAN)
- bbuf.put(isNullCol)
- if (isNullCol == 0x01.toByte) {
+ if (value == null) {
+ bbuf.put(nullValMarker)
writer.write(idx, bbuf.array())
} else {
field.dataType match {
case BooleanType =>
case ByteType =>
+ bbuf.put(positiveValMarker)
bbuf.put(value.asInstanceOf[Byte])
writer.write(idx, bbuf.array())
- // for other multi-byte types, we need to convert to big-endian
case ShortType =>
Review Comment:
nit: Sorry for nitpicking, but as we do explicit type casting twice, can we
do the following?
```case s: ShortType =>```
Apply to all types except matching with multiple types (like above
BooleanType/ByteType).
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -294,6 +295,60 @@ class RocksDBStateStoreSuite extends
StateStoreSuiteBase[RocksDBStateStoreProvid
}
}
+ testWithColumnFamilies("rocksdb range scan - variable size non-ordering
columns with " +
+ "double type values are supported",
+ TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled
=>
+
+ val testSchema: StructType = StructType(
+ Seq(StructField("key1", DoubleType, false),
+ StructField("key2", StringType, false)))
+
+ val schemaProj = UnsafeProjection.create(Array[DataType](DoubleType,
StringType))
+ tryWithProviderResource(newStoreProvider(testSchema,
+ RangeKeyScanStateEncoderSpec(testSchema, 1), colFamiliesEnabled)) {
provider =>
+ val store = provider.getStore(0)
+
+ val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+ if (colFamiliesEnabled) {
+ store.createColFamilyIfAbsent(cfName,
+ testSchema, valueSchema,
+ RangeKeyScanStateEncoderSpec(testSchema, 1))
+ }
+
+ // Verify that the sort ordering here is as follows:
+ // -NaN, -Infinity, -ve values, -0, 0, +0, +ve values, +Infinity, +NaN
Review Comment:
nit: This test does not verify that the ordering takes NaN into account, do
I understand correctly? If then let's update the code comment to clarify.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]