sahnib commented on code in PR #45937:
URL: https://github.com/apache/spark/pull/45937#discussion_r1561381648


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TTLState.scala:
##########
@@ -105,17 +109,21 @@ abstract class SingleKeyTTLStateImpl(
   /**
    * Clears any state which has ttl older than [[ttlExpirationMs]].
    */
-  override def clearExpiredState(): Unit = {
+  override def clearExpiredState(): Long = {
     val iterator = store.iterator(ttlColumnFamilyName)
+    var numValuesExpired = 0L
 
     iterator.takeWhile { kv =>
       val expirationMs = kv.key.getLong(0)
       StateTTL.isExpired(expirationMs, ttlExpirationMs)
     }.foreach { kv =>
       val groupingKey = kv.key.getBinary(1)
-      clearIfExpired(groupingKey)
+      if (clearIfExpired(groupingKey)) {
+        numValuesExpired += 1
+      }

Review Comment:
   This might not work for ListState, as we can expire multiple values in the 
list for a grouping Key. Should we calculate this metric inside the state 
variable itself for future compatibility with ListState?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to