xuanyuanking commented on a change in pull request #28781:
URL: https://github.com/apache/spark/pull/28781#discussion_r533373596



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
##########
@@ -112,4 +115,104 @@ class StreamingQueryStatusListenerSuite extends 
StreamTest {
     
assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.runId
 == runId0)
     
assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.id 
== id)
   }
+
+  test("test small retained queries") {
+    val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf)
+    val conf = spark.sparkContext.conf
+    conf.set(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES.key, "2")
+    val listener = new StreamingQueryStatusListener(conf, kvStore)
+    val queryStore = new StreamingQueryStatusStore(kvStore)
+
+    def addNewQuery(): (UUID, UUID) = {
+      val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // 
ISO8601
+      format.setTimeZone(getTimeZone("UTC"))
+      val id = UUID.randomUUID()
+      val runId = UUID.randomUUID()
+      val startEvent = new StreamingQueryListener.QueryStartedEvent(
+        id, runId, "test1", format.format(new 
Date(System.currentTimeMillis())))
+      listener.onQueryStarted(startEvent)
+      (id, runId)
+    }
+
+    val (id1, runId1) = addNewQuery()
+    val (id2, runId2) = addNewQuery()
+    val (id3, runId3) = addNewQuery()
+
+    assert(queryStore.allQueryUIData.count(!_.summary.isActive) == 0)
+
+    val terminateEvent1 = new StreamingQueryListener.QueryTerminatedEvent(id1, 
runId1, None)
+    listener.onQueryTerminated(terminateEvent1)
+    // sleep 100 mills to make sure clean work complete
+    Thread.sleep(100)

Review comment:
       The sleep will make the tests flaky. Try to change them to conditional 
wait like `eventually(timeout(10.seconds))`.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
##########
@@ -20,102 +20,144 @@ package org.apache.spark.sql.streaming.ui
 import java.util.UUID
 import java.util.concurrent.ConcurrentHashMap
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable
 
+import com.fasterxml.jackson.annotation.JsonIgnore
+
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.streaming.{StreamingQueryListener, 
StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._
 import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.KVIndex
 
 /**
  * A customized StreamingQueryListener used in structured streaming UI, which 
contains all
  * UI data for both active and inactive query.
- * TODO: Add support for history server.
  */
-private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends 
StreamingQueryListener {
-
-  /**
-   * We use runId as the key here instead of id in active query status map,
-   * because the runId is unique for every started query, even it its a 
restart.
-   */
-  private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID, 
StreamingQueryUIData]()
-  private[ui] val inactiveQueryStatus = new 
mutable.Queue[StreamingQueryUIData]()
+private[sql] class StreamingQueryStatusListener(
+    conf: SparkConf,
+    store: ElementTrackingStore) extends StreamingQueryListener {
 
   private val streamingProgressRetention =
     conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
   private val inactiveQueryStatusRetention = 
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
 
+  store.addTrigger(classOf[StreamingQueryData], inactiveQueryStatusRetention) 
{ count =>
+    cleanupInactiveQueries(count)
+  }
+
+  // Events from the same query run will never be processed concurrently, so 
it's safe to
+  // access `progressIds` without any protection.
+  private val queryToProgress = new ConcurrentHashMap[UUID, 
mutable.Queue[String]]()
+
+  private def cleanupInactiveQueries(count: Long): Unit = {
+    val view = 
store.view(classOf[StreamingQueryData]).index("active").first(false).last(false)
+    val inactiveQueries = KVUtils.viewToSeq(view, Int.MaxValue)(_ => true)
+    val numInactiveQueries = inactiveQueries.size
+    if (numInactiveQueries <= inactiveQueryStatusRetention) {
+      return
+    }
+    val toDelete = inactiveQueries.sortBy(_.endTimestamp.get)
+      .take(numInactiveQueries - inactiveQueryStatusRetention)
+    val runIds = toDelete.map { e =>
+      store.delete(e.getClass, e.runId)
+      e.runId.toString
+    }
+    // Delete wrappers in one pass, as deleting them for each summary is slow
+    store.removeAllByIndexValues(classOf[StreamingQueryProgressWrapper], 
"runId", runIds)
+  }
+
   override def onQueryStarted(event: 
StreamingQueryListener.QueryStartedEvent): Unit = {
     val startTimestamp = parseProgressTimestamp(event.timestamp)
-    activeQueryStatus.putIfAbsent(event.runId,
-      new StreamingQueryUIData(event.name, event.id, event.runId, 
startTimestamp))
+    store.write(new StreamingQueryData(
+      event.name,
+      event.id,
+      event.runId,
+      isActive = true,
+      None,
+      startTimestamp
+    ), checkTriggers = true)
   }
 
   override def onQueryProgress(event: 
StreamingQueryListener.QueryProgressEvent): Unit = {
-    val batchTimestamp = parseProgressTimestamp(event.progress.timestamp)
-    val queryStatus = activeQueryStatus.getOrDefault(
-      event.progress.runId,
-      new StreamingQueryUIData(event.progress.name, event.progress.id, 
event.progress.runId,
-        batchTimestamp))
-    queryStatus.updateProcess(event.progress, streamingProgressRetention)
-  }
-
-  override def onQueryTerminated(
-      event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized 
{
-    val queryStatus = activeQueryStatus.remove(event.runId)
-    if (queryStatus != null) {
-      queryStatus.queryTerminated(event)
-      inactiveQueryStatus += queryStatus
-      while (inactiveQueryStatus.length >= inactiveQueryStatusRetention) {
-        inactiveQueryStatus.dequeue()
-      }
+    val runId = event.progress.runId
+    val batchId = event.progress.batchId
+    val timestamp = event.progress.timestamp
+    if (!queryToProgress.containsKey(runId)) {
+      queryToProgress.put(runId, mutable.Queue.empty[String])
+    }
+    val progressIds = queryToProgress.get(runId)
+    progressIds.enqueue(getUniqueId(runId, batchId, timestamp))
+    store.write(new StreamingQueryProgressWrapper(event.progress))
+    while(progressIds.length > streamingProgressRetention) {

Review comment:
        code style nit:`while (`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to