uncleGen commented on a change in pull request #28781:
URL: https://github.com/apache/spark/pull/28781#discussion_r509034111
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
##########
@@ -18,104 +18,139 @@
package org.apache.spark.sql.streaming.ui
import java.util.UUID
-import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConverters._
import scala.collection.mutable
+import com.fasterxml.jackson.annotation.JsonIgnore
+
import org.apache.spark.SparkConf
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.streaming.{StreamingQueryListener,
StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._
import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.KVIndex
/**
* A customized StreamingQueryListener used in structured streaming UI, which
contains all
* UI data for both active and inactive query.
- * TODO: Add support for history server.
*/
-private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends
StreamingQueryListener {
-
- /**
- * We use runId as the key here instead of id in active query status map,
- * because the runId is unique for every started query, even it its a
restart.
- */
- private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID,
StreamingQueryUIData]()
- private[ui] val inactiveQueryStatus = new
mutable.Queue[StreamingQueryUIData]()
+private[sql] class StreamingQueryStatusListener(
+ conf: SparkConf,
+ store: ElementTrackingStore) extends StreamingQueryListener {
private val streamingProgressRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
private val inactiveQueryStatusRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
+ store.addTrigger(classOf[StreamingQuerySummary],
inactiveQueryStatusRetention) { count =>
+ cleanupInactiveQueries(count)
+ }
+
+ private val queryToProgress = new mutable.HashMap[UUID,
mutable.Queue[String]]()
+
+ private def cleanupInactiveQueries(count: Long): Unit = {
+ val countToDelete = count - inactiveQueryStatusRetention
+ if (countToDelete <= 0) {
+ return
+ }
+
+ val view =
store.view(classOf[StreamingQuerySummary]).index("active").first(false).last(false)
+ val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_ => true)
+ toDelete.foreach { e =>
+ store.delete(e.getClass, e.runId)
Review comment:
The inactive queries should be limited in product, and there should not
be inactive query to delete in every trigger interval. Then we use the
`removeAllByIndexValues` to delete all query progresses in batch way.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]