uncleGen commented on a change in pull request #28781:
URL: https://github.com/apache/spark/pull/28781#discussion_r437986587



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
##########
@@ -18,104 +18,161 @@
 package org.apache.spark.sql.streaming.ui
 
 import java.util.UUID
-import java.util.concurrent.ConcurrentHashMap
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import scala.collection.immutable.Queue
+
+import com.fasterxml.jackson.annotation.JsonIgnore
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.streaming.{StreamingQueryListener, 
StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._
 import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.KVIndex
 
 /**
  * A customized StreamingQueryListener used in structured streaming UI, which 
contains all
  * UI data for both active and inactive query.
  * TODO: Add support for history server.
  */
-private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends 
StreamingQueryListener {
-
-  /**
-   * We use runId as the key here instead of id in active query status map,
-   * because the runId is unique for every started query, even it its a 
restart.
-   */
-  private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID, 
StreamingQueryUIData]()
-  private[ui] val inactiveQueryStatus = new 
mutable.Queue[StreamingQueryUIData]()
+private[sql] class StreamingQueryStatusListener(
+    conf: SparkConf,
+    store: ElementTrackingStore) extends StreamingQueryListener {
 
   private val streamingProgressRetention =
     conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
   private val inactiveQueryStatusRetention = 
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
 
+  store.addTrigger(classOf[StreamingQueryUIData], 
inactiveQueryStatusRetention) { count =>

Review comment:
       add a trigger to clean inactive query data in store according to 
`StaticSQLConf.STREAMING_UI_RETAINED_QUERIES`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
##########
@@ -35,12 +35,12 @@ import org.apache.spark.util.ListenerBus
  * and StreamingQueryManager. So this bus will dispatch events to registered 
listeners for only
  * those queries that were started in the associated SparkSession.
  */
-class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
+class StreamingQueryListenerBus(sparkListenerBus: Option[LiveListenerBus])

Review comment:
       Make `sparkListenerBus `optional. When loaded by in History Server, 
spark use the `ReplayListenerBus` instead of `LiveListenerBus`. In live ui, 
streaming query post the event into `StreamingQueryListenerBus`, and the 
`StreamingQueryListenerBus` will post these events into `LiveListenerBus`. 
`StreamingQueryListenerBus` also subscribes to `LiveListenerBus` events thus 
getting back the posted event in a different thread. In history ui, 
`StreamingQueryListenerBus` will subscribes to the `ReplayListenerBus`, and 
process those events through `onOtherEvent()` directly.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
##########
@@ -110,7 +116,15 @@ class StreamingQueryListenerBus(sparkListenerBus: 
LiveListenerBus)
       listener: StreamingQueryListener,
       event: StreamingQueryListener.Event): Unit = {
     def shouldReport(runId: UUID): Boolean = {
-      activeQueryRunIds.synchronized { activeQueryRunIds.contains(runId) }
+      // When loaded by Spark History Server, we should process all event 
coming from replay
+      // listener bus.
+      if (sparkListenerBus.isEmpty) {

Review comment:
       In live ui, streaming post event into `StreamingQueryListenerBus`. 
`StreamingQueryListenerBus` will manage the query status. But in history ui, 
`StreamingQueryListenerBus` subscribes to the `ReplayListenerBus`, it should 
process all events.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
##########
@@ -95,7 +95,13 @@ class StreamingQueryListenerBus(sparkListenerBus: 
LiveListenerBus)
         // synchronously and the ones attached to LiveListenerBus 
asynchronously. Therefore,
         // we need to ignore QueryStartedEvent if this method is called within 
SparkListenerBus
         // thread
-        if (!LiveListenerBus.withinListenerThread.value || 
!e.isInstanceOf[QueryStartedEvent]) {
+        //
+        // When loaded by Spark History Server, we should process all event 
coming from replay
+        // listener bus.
+        if (sparkListenerBus.isEmpty) {

Review comment:
       Check the `sparkListenerBus` is defined or not to determine if process 
all event.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
##########
@@ -18,104 +18,161 @@
 package org.apache.spark.sql.streaming.ui
 
 import java.util.UUID
-import java.util.concurrent.ConcurrentHashMap
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import scala.collection.immutable.Queue
+
+import com.fasterxml.jackson.annotation.JsonIgnore
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.streaming.{StreamingQueryListener, 
StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._
 import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.KVIndex
 
 /**
  * A customized StreamingQueryListener used in structured streaming UI, which 
contains all
  * UI data for both active and inactive query.
  * TODO: Add support for history server.
  */
-private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends 
StreamingQueryListener {
-
-  /**
-   * We use runId as the key here instead of id in active query status map,
-   * because the runId is unique for every started query, even it its a 
restart.
-   */
-  private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID, 
StreamingQueryUIData]()
-  private[ui] val inactiveQueryStatus = new 
mutable.Queue[StreamingQueryUIData]()
+private[sql] class StreamingQueryStatusListener(
+    conf: SparkConf,
+    store: ElementTrackingStore) extends StreamingQueryListener {
 
   private val streamingProgressRetention =
     conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
   private val inactiveQueryStatusRetention = 
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
 
+  store.addTrigger(classOf[StreamingQueryUIData], 
inactiveQueryStatusRetention) { count =>
+    cleanupInactiveQueries(count)
+  }
+
+  private def cleanupInactiveQueries(count: Long): Unit = {
+    val countToDelete = count - inactiveQueryStatusRetention
+    if (countToDelete <= 0) {
+      return
+    }
+
+    val view = 
store.view(classOf[StreamingQueryUIData]).index("startTimestamp").first(0L)
+    val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_.isActive == 
false)
+    toDelete.foreach { e =>
+      store.delete(e.getClass(), e.runId)
+      store.removeAllByIndexValues(
+        classOf[StreamingQueryProgressWrapper], "runId", e.runId.toString)
+    }
+  }
+
   override def onQueryStarted(event: 
StreamingQueryListener.QueryStartedEvent): Unit = {
     val startTimestamp = parseProgressTimestamp(event.timestamp)
-    activeQueryStatus.putIfAbsent(event.runId,
-      new StreamingQueryUIData(event.name, event.id, event.runId, 
startTimestamp))
+    val querySummary = new StreamingQueryUIData(
+      event.name,
+      event.id,
+      event.runId,
+      Queue.empty[String],
+      startTimestamp,
+      true,
+      None)
+    store.write(querySummary)
   }
 
   override def onQueryProgress(event: 
StreamingQueryListener.QueryProgressEvent): Unit = {
-    val batchTimestamp = parseProgressTimestamp(event.progress.timestamp)
-    val queryStatus = activeQueryStatus.getOrDefault(
-      event.progress.runId,
-      new StreamingQueryUIData(event.progress.name, event.progress.id, 
event.progress.runId,
-        batchTimestamp))
-    queryStatus.updateProcess(event.progress, streamingProgressRetention)
+    val runId = event.progress.runId
+    val batchId = event.progress.batchId
+    val timestamp = event.progress.timestamp
+    val querySummary = store.read(classOf[StreamingQueryUIData], runId)
+    val progressIdQueue =
+      querySummary.progressIdQueue ++ Seq(getUniqueId(runId, batchId, 
timestamp))
+    store.write(new StreamingQueryProgressWrapper(event.progress))
+    while(progressIdQueue.length >= streamingProgressRetention) {
+      val uniqueId = progressIdQueue.dequeue
+      store.delete(classOf[StreamingQueryProgressWrapper], uniqueId)
+    }
+    store.delete(classOf[StreamingQueryUIData], runId)
+    store.write(new StreamingQueryUIData(
+      querySummary.name,
+      querySummary.id,
+      querySummary.runId,
+      progressIdQueue,
+      querySummary.startTimestamp,
+      querySummary.isActive,
+      querySummary.exception
+    ))
   }
 
   override def onQueryTerminated(
       event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized 
{
-    val queryStatus = activeQueryStatus.remove(event.runId)
-    if (queryStatus != null) {
-      queryStatus.queryTerminated(event)
-      inactiveQueryStatus += queryStatus
-      while (inactiveQueryStatus.length >= inactiveQueryStatusRetention) {
-        inactiveQueryStatus.dequeue()
-      }
-    }
-  }
-
-  def allQueryStatus: Seq[StreamingQueryUIData] = synchronized {
-    activeQueryStatus.values().asScala.toSeq ++ inactiveQueryStatus
+    val querySummary = store.read(classOf[StreamingQueryUIData], event.runId)
+    store.delete(classOf[StreamingQueryUIData], event.runId)
+    store.write(new StreamingQueryUIData(
+      querySummary.name,
+      querySummary.id,
+      querySummary.runId,
+      querySummary.progressIdQueue,
+      querySummary.startTimestamp,
+      false,
+      querySummary.exception
+    ))
   }
 }
 
 /**
  * This class contains all message related to UI display, each instance 
corresponds to a single
  * [[org.apache.spark.sql.streaming.StreamingQuery]].
  */
-private[ui] class StreamingQueryUIData(
+private[sql] class StreamingQueryUIData(
     val name: String,
     val id: UUID,
-    val runId: UUID,
-    val startTimestamp: Long) {
+    @KVIndexParam val runId: UUID,
+    val progressIdQueue: Queue[String],
+    val startTimestamp: Long,
+    val isActive: Boolean,
+    val exception: Option[String]) {
 
-  /** Holds the most recent query progress updates. */
-  private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
+  private var storeOption: Option[ElementTrackingStore] = None
 
-  private var _isActive = true
-  private var _exception: Option[String] = None
+  @JsonIgnore @KVIndex("startTimestamp")
+  private def startTimestampIndex: Long = startTimestamp
 
-  def isActive: Boolean = synchronized { _isActive }
-
-  def exception: Option[String] = synchronized { _exception }
-
-  def queryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): 
Unit = synchronized {
-    _isActive = false
-    _exception = event.exception
+  def recentProgress: Array[StreamingQueryProgress] = {
+    storeOption.map { store => progressIdQueue.map { uniqueId =>
+      store.read(classOf[StreamingQueryProgressWrapper], uniqueId).progress
+    }.toArray }.getOrElse(Array.empty)
   }
 
-  def updateProcess(
-      newProgress: StreamingQueryProgress, retentionNum: Int): Unit = 
progressBuffer.synchronized {
-    progressBuffer += newProgress
-    while (progressBuffer.length >= retentionNum) {
-      progressBuffer.dequeue()
+  def lastProgress: StreamingQueryProgress = {
+    if (progressIdQueue.nonEmpty) {
+      storeOption.map(_.read(classOf[StreamingQueryProgressWrapper], 
progressIdQueue.last).progress)
+        .orNull
+    } else {
+      null
     }
   }
 
-  def recentProgress: Array[StreamingQueryProgress] = 
progressBuffer.synchronized {
-    progressBuffer.toArray
+  def setKVStore(store: ElementTrackingStore): StreamingQueryUIData = {
+    storeOption = Option(store)
+    this
   }
+}
 
-  def lastProgress: StreamingQueryProgress = progressBuffer.synchronized {
-    progressBuffer.lastOption.orNull
+private[sql] class StreamingQueryProgressWrapper(val progress: 
StreamingQueryProgress) {
+  @KVIndexParam("batchId") val batchId: Long = progress.batchId
+  @KVIndexParam("runId") val runId: String = progress.runId.toString
+
+  @JsonIgnore @KVIndex
+  def uniqueId: String = getUniqueId(progress.runId, progress.batchId, 
progress.timestamp)
+}
+
+private[sql] object StreamingQueryProgressWrapper {
+  /**
+   * Adding `timestamp` into unique id to support reporting `empty` query 
progress
+   * when no data comes.
+   */
+  def getUniqueId(
+      runId: UUID,
+      batchId: Long,
+      timestamp: String): String = {
+    s"${runId}_${batchId}_$timestamp"

Review comment:
       The unique id of streaming query process info in kvstore. It contains 
`runId`, `batchId` and `timestamp`. We use `timestamp` to distinguish the empty 
process report which has the same `batchId`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to