HeartSaVioR commented on code in PR #45905:
URL: https://github.com/apache/spark/pull/45905#discussion_r1555208836


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -229,16 +229,19 @@ class PrefixKeyScanStateEncoder(
  * here: https://en.wikipedia.org/wiki/IEEE_754#Design_rationale
  *
  * @param keySchema - schema of the key to be encoded
- * @param numOrderingCols - number of columns to be used for range scan
+ * @param orderingOrdinals - the ordinals for which the range scan is 
constructed
  */
 class RangeKeyScanStateEncoder(
     keySchema: StructType,
-    numOrderingCols: Int) extends RocksDBKeyStateEncoder {
+    orderingOrdinals: Seq[Int]) extends RocksDBKeyStateEncoder {
 
   import RocksDBStateEncoder._
 
-  private val rangeScanKeyFieldsWithIdx: Seq[(StructField, Int)] = {
-    keySchema.zipWithIndex.take(numOrderingCols)
+  private val rangeScanKeyFieldsWithOrdinal: Seq[(StructField, Int)] = {
+    orderingOrdinals.map(ordinal => {

Review Comment:
   nit: `{ ordinal =>` 
   
   
https://github.com/databricks/scala-style-guide?tab=readme-ov-file#anonymous-methods



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -248,34 +251,56 @@ class RangeKeyScanStateEncoder(
   }
 
   // verify that only fixed sized columns are used for ordering
-  rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
+  rangeScanKeyFieldsWithOrdinal.foreach { case (field, ordinal) =>
     if (!isFixedSize(field.dataType)) {
       // NullType is technically fixed size, but not supported for ordering
       if (field.dataType == NullType) {
-        throw StateStoreErrors.nullTypeOrderingColsNotSupported(field.name, 
idx.toString)
+        throw StateStoreErrors.nullTypeOrderingColsNotSupported(field.name, 
ordinal.toString)
       } else {
-        throw 
StateStoreErrors.variableSizeOrderingColsNotSupported(field.name, idx.toString)
+        throw 
StateStoreErrors.variableSizeOrderingColsNotSupported(field.name, 
ordinal.toString)
       }
     }
   }
 
-  private val remainingKeyFieldsWithIdx: Seq[(StructField, Int)] = {
-    keySchema.zipWithIndex.drop(numOrderingCols)
+  private val remainingKeyFieldsWithOrdinal: Seq[(StructField, Int)] = {
+    0.to(keySchema.length - 1).diff(orderingOrdinals).map(ordinal => {

Review Comment:
   nit: same, `{ ordinal =>`



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -447,6 +492,97 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  testWithColumnFamilies("rocksdb range scan multiple non-contiguous ordering 
columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled ) { 
colFamiliesEnabled =>
+    val testSchema: StructType = StructType(
+      Seq(
+        StructField("ordering-1", LongType, false),
+        StructField("key2", StringType, false),
+        StructField("ordering-2", IntegerType, false),
+        StructField("string-2", StringType, false),
+        StructField("ordering-3", DoubleType, false)
+      )
+    )
+
+    val testSchemaProj = UnsafeProjection.create(Array[DataType](
+        immutable.ArraySeq.unsafeWrapArray(testSchema.fields.map(_.dataType)): 
_*))
+    val rangeScanOrdinals = Seq(0, 2, 4)
+
+    tryWithProviderResource(
+      newStoreProvider(
+        testSchema,
+        RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals),
+        colFamiliesEnabled
+      )
+    ) { provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(
+          cfName,
+          testSchema,
+          valueSchema,
+          RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals)
+        )
+      }
+
+      val orderedInput = Seq(
+        // Make sure that the first column takes precedence, even if the
+        // later columns are greater
+        (-2L, 0, 99.0),
+        (-1L, 0, 98.0),
+        (0L, 0, 97.0),
+        (2L, 0, 96.0),
+        // Make sure that the second column takes precedence, when the first
+        // column is all the same
+        (3L, -2, -1.0),
+        (3L, -1, -2.0),
+        (3L, 0, -3.0),
+        (3L, 2, -4.0),
+        // Finally, make sure that the third column takes precedence, when the
+        // first two ordering columns are the same.
+        (4L, -1, -127.0),
+        (4L, -1, 0.0),
+        (4L, -1, 64.0),
+        (4L, -1, 127.0)
+      )
+      val scrambledInput = Random.shuffle(orderedInput)
+
+      scrambledInput.foreach { record =>
+        val keyRow = testSchemaProj.apply(
+          new GenericInternalRow(
+            Array[Any](
+              record._1,
+              
UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 
1).mkString),
+              record._2,
+              
UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 
1).mkString),
+              record._3
+            )
+          )
+        )
+
+        // The value is just a "dummy" value of 1
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      // scalastyle:off

Review Comment:
   Any reason you had to disable style? when you disable style for valid 
reason, please be specific to disable a single rule, and also make sure to 
re-enable as long as it doesn't need to be disabled further.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to