anishshri-db commented on code in PR #45709:
URL: https://github.com/apache/spark/pull/45709#discussion_r1540147143
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##########
@@ -188,17 +187,20 @@ class TimerStateImpl(
/**
* Function to get all the registered timers for all grouping keys
+ * @param expiryTimestampMs Threshold for expired timestamp in milliseconds,
this function
+ * will return every timers that has (strictly)
smaller timestamp
* @return - iterator of all the registered timers for all grouping keys
*/
- def getExpiredTimers(): Iterator[(Any, Long)] = {
+ def getExpiredTimers(expiryTimestampMs: Long): Iterator[(Any, Long)] = {
+ // this iter is increasingly sorted on timestamp
val iter = store.iterator(tsToKeyCFName)
new NextIterator[(Any, Long)] {
override protected def getNext(): (Any, Long) = {
- if (iter.hasNext) {
- val rowPair = iter.next()
- val keyRow = rowPair.key
- val result = getTimerRowFromSecIndex(keyRow)
+ val rowPair = if (iter.hasNext) iter.next() else null
Review Comment:
Lets keep the original if condition as it is ? and add a condition to return
`null` or `result` within this case ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]