anishshri-db commented on code in PR #45709:
URL: https://github.com/apache/spark/pull/45709#discussion_r1540170826


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/TimerSuite.scala:
##########
@@ -99,15 +102,69 @@ class TimerSuite extends StateVariableSuiteBase {
       ImplicitGroupingKeyTracker.removeImplicitKey()
 
       ImplicitGroupingKeyTracker.setImplicitKey("test_key1")
-      assert(timerState1.getExpiredTimers().toSet ===
-        Set(("test_key2", 15000L), ("test_key1", 2000L), ("test_key1", 1000L)))
+      assert(timerState1.getExpiredTimers(Long.MaxValue).toSeq ===
+        Seq(("test_key1", 1000L), ("test_key1", 2000L), ("test_key2", 15000L)))
+      assert(timerState1.getExpiredTimers(10000L).toSeq ===
+        Seq(("test_key1", 1000L), ("test_key1", 2000L)))
       assert(timerState1.listTimers().toSet === Set(1000L, 2000L))
       ImplicitGroupingKeyTracker.removeImplicitKey()
 
       ImplicitGroupingKeyTracker.setImplicitKey("test_key2")
       assert(timerState2.listTimers().toSet === Set(15000L))
-      assert(timerState2.getExpiredTimers().toSet ===
-        Set(("test_key2", 15000L), ("test_key1", 2000L), ("test_key1", 1000L)))
+      assert(timerState2.getExpiredTimers(1500L).toSeq === Seq(("test_key1", 
1000L)))
+    }
+  }
+
+  testWithTimeOutMode("Range scan on second index timer key - " +
+    "verify timestamp is sorted for single instance") { timeoutMode =>
+    tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
+      val store = provider.getStore(0)
+
+      ImplicitGroupingKeyTracker.setImplicitKey("test_key")
+      val timerState = new TimerStateImpl(store, timeoutMode,
+        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])
+      val timerTimerstamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 
3L, 35L, 6L, 9L, 5L)
+      // register/put unordered timestamp into rocksDB
+      timerTimerstamps.foreach(timerState.registerTimer)
+      assert(timerState.getExpiredTimers(Long.MaxValue).toSeq.map(_._2) === 
timerTimerstamps.sorted)
+      assert(timerState.getExpiredTimers(4200L).toSeq.map(_._2) ===
+        timerTimerstamps.sorted.takeWhile(_ < 4200L))
+      assert(timerState.getExpiredTimers(Long.MinValue).toSeq === Seq.empty)
+      ImplicitGroupingKeyTracker.removeImplicitKey()
+    }
+  }
+
+  testWithTimeOutMode("test range scan on second index timer key - " +
+    "verify timestamp is sorted for multiple instances") { timeoutMode =>
+    tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
+      val store = provider.getStore(0)
+
+      ImplicitGroupingKeyTracker.setImplicitKey("test_key1")
+      val timerState1 = new TimerStateImpl(store, timeoutMode,
+        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])
+      val timerTimestamps1 = Seq(64L, 32L, 1024L, 4096L, 0L, 1L)
+      timerTimestamps1.foreach(timerState1.registerTimer)
+
+      val timerState2 = new TimerStateImpl(store, timeoutMode,
+        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])
+      val timerTimestamps2 = Seq(931L, 8000L, 452300L, 4200L)
+      timerTimestamps2.foreach(timerState2.registerTimer)
+      ImplicitGroupingKeyTracker.removeImplicitKey()
+
+      ImplicitGroupingKeyTracker.setImplicitKey("test_key3")
+      val timerState3 = new TimerStateImpl(store, timeoutMode,
+        Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])
+      val timerTimerStamps3 = Seq(1L, 2L, 8L, 3L)
+      timerTimerStamps3.foreach(timerState3.registerTimer)
+      ImplicitGroupingKeyTracker.removeImplicitKey()
+
+      ImplicitGroupingKeyTracker.setImplicitKey("test_key1")

Review Comment:
   If we remove setting the implicit key just for this section (for the calls 
to getExpiredTimers) would the test still work ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to