HeartSaVioR commented on a change in pull request #34502:
URL: https://github.com/apache/spark/pull/34502#discussion_r751806678



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##########
@@ -196,6 +210,29 @@ class RocksDB(
     }
   }
 
+  private def countKeys(): Long = {
+    // This is being called when opening DB, so doesn't need to deal with 
writeBatch.
+    val iter = db.newIterator()
+    logInfo(s"Counting keys - getting iterator from version $loadedVersion")
+    iter.seekToFirst()
+
+    // Attempt to close this iterator if there is a task failure, or a task 
interruption.
+    // This is a hack because it assumes that the RocksDB is running inside a 
task.
+    Option(TaskContext.get()).foreach { tc =>
+      tc.addTaskCompletionListener[Unit] { _ => iter.close() }
+    }

Review comment:
       Nice finding! I was blindly following the iterator() method, my bad.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to