neilramaswamy commented on code in PR #45905:
URL: https://github.com/apache/spark/pull/45905#discussion_r1555241781


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -447,6 +492,97 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  testWithColumnFamilies("rocksdb range scan multiple non-contiguous ordering 
columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled ) { 
colFamiliesEnabled =>
+    val testSchema: StructType = StructType(
+      Seq(
+        StructField("ordering-1", LongType, false),
+        StructField("key2", StringType, false),
+        StructField("ordering-2", IntegerType, false),
+        StructField("string-2", StringType, false),
+        StructField("ordering-3", DoubleType, false)
+      )
+    )
+
+    val testSchemaProj = UnsafeProjection.create(Array[DataType](
+        immutable.ArraySeq.unsafeWrapArray(testSchema.fields.map(_.dataType)): 
_*))
+    val rangeScanOrdinals = Seq(0, 2, 4)
+
+    tryWithProviderResource(
+      newStoreProvider(
+        testSchema,
+        RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals),
+        colFamiliesEnabled
+      )
+    ) { provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(
+          cfName,
+          testSchema,
+          valueSchema,
+          RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals)
+        )
+      }
+
+      val orderedInput = Seq(
+        // Make sure that the first column takes precedence, even if the
+        // later columns are greater
+        (-2L, 0, 99.0),
+        (-1L, 0, 98.0),
+        (0L, 0, 97.0),
+        (2L, 0, 96.0),
+        // Make sure that the second column takes precedence, when the first
+        // column is all the same
+        (3L, -2, -1.0),
+        (3L, -1, -2.0),
+        (3L, 0, -3.0),
+        (3L, 2, -4.0),
+        // Finally, make sure that the third column takes precedence, when the
+        // first two ordering columns are the same.
+        (4L, -1, -127.0),
+        (4L, -1, 0.0),
+        (4L, -1, 64.0),
+        (4L, -1, 127.0)
+      )
+      val scrambledInput = Random.shuffle(orderedInput)
+
+      scrambledInput.foreach { record =>
+        val keyRow = testSchemaProj.apply(
+          new GenericInternalRow(
+            Array[Any](
+              record._1,
+              
UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 
1).mkString),
+              record._2,
+              
UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 
1).mkString),
+              record._3
+            )
+          )
+        )
+
+        // The value is just a "dummy" value of 1
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      // scalastyle:off

Review Comment:
   Just a typo!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to