zsxwing commented on a change in pull request #28781:
URL: https://github.com/apache/spark/pull/28781#discussion_r527140209



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
##########
@@ -20,102 +20,145 @@ package org.apache.spark.sql.streaming.ui
 import java.util.UUID
 import java.util.concurrent.ConcurrentHashMap
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable
 
+import com.fasterxml.jackson.annotation.JsonIgnore
+
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.streaming.{StreamingQueryListener, 
StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._
 import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.KVIndex
 
 /**
  * A customized StreamingQueryListener used in structured streaming UI, which 
contains all
  * UI data for both active and inactive query.
- * TODO: Add support for history server.
  */
-private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends 
StreamingQueryListener {
-
-  /**
-   * We use runId as the key here instead of id in active query status map,
-   * because the runId is unique for every started query, even it its a 
restart.
-   */
-  private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID, 
StreamingQueryUIData]()
-  private[ui] val inactiveQueryStatus = new 
mutable.Queue[StreamingQueryUIData]()
+private[sql] class StreamingQueryStatusListener(
+    conf: SparkConf,
+    store: ElementTrackingStore) extends StreamingQueryListener {
 
   private val streamingProgressRetention =
     conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
   private val inactiveQueryStatusRetention = 
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
 
+  store.addTrigger(classOf[StreamingQueryData], inactiveQueryStatusRetention) 
{ count =>
+    cleanupInactiveQueries(count)
+  }
+
+  private val queryToProgress = new ConcurrentHashMap[UUID, 
mutable.Queue[String]]()
+
+  private def cleanupInactiveQueries(count: Long): Unit = {
+    val countToDelete = count - inactiveQueryStatusRetention
+    if (countToDelete <= 0) {
+      return
+    }
+
+    val view = 
store.view(classOf[StreamingQueryData]).index("active").first(false).last(false)
+    val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_ => true)
+    val runIds = toDelete.map { e =>
+      store.delete(e.getClass, e.runId)
+      e.runId.toString
+    }
+    // Delete wrappers in one pass, as deleting them for each summary is slow
+    store.removeAllByIndexValues(classOf[StreamingQueryProgressWrapper], 
"runId", runIds)
+  }
+
   override def onQueryStarted(event: 
StreamingQueryListener.QueryStartedEvent): Unit = {
     val startTimestamp = parseProgressTimestamp(event.timestamp)
-    activeQueryStatus.putIfAbsent(event.runId,
-      new StreamingQueryUIData(event.name, event.id, event.runId, 
startTimestamp))
+    store.write(new StreamingQueryData(
+      event.name,
+      event.id,
+      event.runId,
+      Array.empty[String],
+      startTimestamp,
+      true,
+      None
+    ))
   }
 
   override def onQueryProgress(event: 
StreamingQueryListener.QueryProgressEvent): Unit = {
-    val batchTimestamp = parseProgressTimestamp(event.progress.timestamp)
-    val queryStatus = activeQueryStatus.getOrDefault(
-      event.progress.runId,
-      new StreamingQueryUIData(event.progress.name, event.progress.id, 
event.progress.runId,
-        batchTimestamp))
-    queryStatus.updateProcess(event.progress, streamingProgressRetention)
+    val runId = event.progress.runId
+    val batchId = event.progress.batchId
+    val timestamp = event.progress.timestamp
+    if (!queryToProgress.contains(event.progress.runId)) {
+      queryToProgress.put(event.progress.runId, mutable.Queue.empty[String])
+    }
+    val progressIds = queryToProgress.get(event.progress.runId)

Review comment:
       nit: can we add a comment to explain why we don't need `synchronized` 
here, such as
   ```
   Events from the same query run will never be processed concurrently, so it's 
safe to access `progressIds` without any protection.
   ```

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
##########
@@ -31,16 +32,21 @@ import org.apache.spark.util.ListenerBus
  * Spark listener bus, so that it can receive 
[[StreamingQueryListener.Event]]s and dispatch them
  * to StreamingQueryListeners.
  *
- * Note that each bus and its registered listeners are associated with a 
single SparkSession
+ * Note 1: Each bus and its registered listeners are associated with a single 
SparkSession
  * and StreamingQueryManager. So this bus will dispatch events to registered 
listeners for only
  * those queries that were started in the associated SparkSession.
+ *
+ * Note 2: To rebuild Structured Streaming UI in SHS, this bus will be 
registered into
+ * [[ReplayListenerBus]]. We use the `live` argument (true in default) to 
determine how to process
+ * [[StreamingQueryListener.Event]]. If `live` is false, it means this bus is 
used to replay all
+ * streaming query event from eventLog.
  */
-class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
+class StreamingQueryListenerBus(sparkListenerBus: Option[LiveListenerBus], 
live: Boolean = true)

Review comment:
       nit: We don't need the `live` parameter. We can get `live` by checking 
`sparkListenerBus.nonEmpty`.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
##########
@@ -20,102 +20,145 @@ package org.apache.spark.sql.streaming.ui
 import java.util.UUID
 import java.util.concurrent.ConcurrentHashMap
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable
 
+import com.fasterxml.jackson.annotation.JsonIgnore
+
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.streaming.{StreamingQueryListener, 
StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._
 import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.KVIndex
 
 /**
  * A customized StreamingQueryListener used in structured streaming UI, which 
contains all
  * UI data for both active and inactive query.
- * TODO: Add support for history server.
  */
-private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends 
StreamingQueryListener {
-
-  /**
-   * We use runId as the key here instead of id in active query status map,
-   * because the runId is unique for every started query, even it its a 
restart.
-   */
-  private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID, 
StreamingQueryUIData]()
-  private[ui] val inactiveQueryStatus = new 
mutable.Queue[StreamingQueryUIData]()
+private[sql] class StreamingQueryStatusListener(
+    conf: SparkConf,
+    store: ElementTrackingStore) extends StreamingQueryListener {
 
   private val streamingProgressRetention =
     conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
   private val inactiveQueryStatusRetention = 
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
 
+  store.addTrigger(classOf[StreamingQueryData], inactiveQueryStatusRetention) 
{ count =>
+    cleanupInactiveQueries(count)
+  }
+
+  private val queryToProgress = new ConcurrentHashMap[UUID, 
mutable.Queue[String]]()
+
+  private def cleanupInactiveQueries(count: Long): Unit = {
+    val countToDelete = count - inactiveQueryStatusRetention
+    if (countToDelete <= 0) {
+      return
+    }
+
+    val view = 
store.view(classOf[StreamingQueryData]).index("active").first(false).last(false)
+    val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_ => true)
+    val runIds = toDelete.map { e =>
+      store.delete(e.getClass, e.runId)
+      e.runId.toString
+    }
+    // Delete wrappers in one pass, as deleting them for each summary is slow
+    store.removeAllByIndexValues(classOf[StreamingQueryProgressWrapper], 
"runId", runIds)
+  }
+
   override def onQueryStarted(event: 
StreamingQueryListener.QueryStartedEvent): Unit = {
     val startTimestamp = parseProgressTimestamp(event.timestamp)
-    activeQueryStatus.putIfAbsent(event.runId,
-      new StreamingQueryUIData(event.name, event.id, event.runId, 
startTimestamp))
+    store.write(new StreamingQueryData(
+      event.name,
+      event.id,
+      event.runId,
+      Array.empty[String],
+      startTimestamp,
+      true,
+      None
+    ))
   }
 
   override def onQueryProgress(event: 
StreamingQueryListener.QueryProgressEvent): Unit = {
-    val batchTimestamp = parseProgressTimestamp(event.progress.timestamp)
-    val queryStatus = activeQueryStatus.getOrDefault(
-      event.progress.runId,
-      new StreamingQueryUIData(event.progress.name, event.progress.id, 
event.progress.runId,
-        batchTimestamp))
-    queryStatus.updateProcess(event.progress, streamingProgressRetention)
+    val runId = event.progress.runId
+    val batchId = event.progress.batchId
+    val timestamp = event.progress.timestamp
+    if (!queryToProgress.contains(event.progress.runId)) {
+      queryToProgress.put(event.progress.runId, mutable.Queue.empty[String])
+    }
+    val progressIds = queryToProgress.get(event.progress.runId)
+    progressIds.enqueue(getUniqueId(runId, batchId, timestamp))
+    store.write(new StreamingQueryProgressWrapper(event.progress))
+    while(progressIds.length >= streamingProgressRetention) {
+      val uniqueId = progressIds.dequeue
+      store.delete(classOf[StreamingQueryProgressWrapper], uniqueId)
+    }
   }
 
   override def onQueryTerminated(
       event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized 
{

Review comment:
       nit: `synchronized` is not needed.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
##########
@@ -20,102 +20,145 @@ package org.apache.spark.sql.streaming.ui
 import java.util.UUID
 import java.util.concurrent.ConcurrentHashMap
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable
 
+import com.fasterxml.jackson.annotation.JsonIgnore
+
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.streaming.{StreamingQueryListener, 
StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._
 import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.KVIndex
 
 /**
  * A customized StreamingQueryListener used in structured streaming UI, which 
contains all
  * UI data for both active and inactive query.
- * TODO: Add support for history server.
  */
-private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends 
StreamingQueryListener {
-
-  /**
-   * We use runId as the key here instead of id in active query status map,
-   * because the runId is unique for every started query, even it its a 
restart.
-   */
-  private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID, 
StreamingQueryUIData]()
-  private[ui] val inactiveQueryStatus = new 
mutable.Queue[StreamingQueryUIData]()
+private[sql] class StreamingQueryStatusListener(
+    conf: SparkConf,
+    store: ElementTrackingStore) extends StreamingQueryListener {
 
   private val streamingProgressRetention =
     conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
   private val inactiveQueryStatusRetention = 
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
 
+  store.addTrigger(classOf[StreamingQueryData], inactiveQueryStatusRetention) 
{ count =>
+    cleanupInactiveQueries(count)
+  }
+
+  private val queryToProgress = new ConcurrentHashMap[UUID, 
mutable.Queue[String]]()
+
+  private def cleanupInactiveQueries(count: Long): Unit = {
+    val countToDelete = count - inactiveQueryStatusRetention
+    if (countToDelete <= 0) {
+      return
+    }
+
+    val view = 
store.view(classOf[StreamingQueryData]).index("active").first(false).last(false)
+    val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_ => true)
+    val runIds = toDelete.map { e =>
+      store.delete(e.getClass, e.runId)
+      e.runId.toString
+    }
+    // Delete wrappers in one pass, as deleting them for each summary is slow
+    store.removeAllByIndexValues(classOf[StreamingQueryProgressWrapper], 
"runId", runIds)
+  }
+
   override def onQueryStarted(event: 
StreamingQueryListener.QueryStartedEvent): Unit = {
     val startTimestamp = parseProgressTimestamp(event.timestamp)
-    activeQueryStatus.putIfAbsent(event.runId,
-      new StreamingQueryUIData(event.name, event.id, event.runId, 
startTimestamp))
+    store.write(new StreamingQueryData(
+      event.name,
+      event.id,
+      event.runId,
+      Array.empty[String],
+      startTimestamp,
+      true,

Review comment:
       nit: `isActive = true`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
##########
@@ -20,102 +20,145 @@ package org.apache.spark.sql.streaming.ui
 import java.util.UUID
 import java.util.concurrent.ConcurrentHashMap
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable
 
+import com.fasterxml.jackson.annotation.JsonIgnore
+
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.streaming.{StreamingQueryListener, 
StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._
 import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.KVIndex
 
 /**
  * A customized StreamingQueryListener used in structured streaming UI, which 
contains all
  * UI data for both active and inactive query.
- * TODO: Add support for history server.
  */
-private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends 
StreamingQueryListener {
-
-  /**
-   * We use runId as the key here instead of id in active query status map,
-   * because the runId is unique for every started query, even it its a 
restart.
-   */
-  private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID, 
StreamingQueryUIData]()
-  private[ui] val inactiveQueryStatus = new 
mutable.Queue[StreamingQueryUIData]()
+private[sql] class StreamingQueryStatusListener(
+    conf: SparkConf,
+    store: ElementTrackingStore) extends StreamingQueryListener {
 
   private val streamingProgressRetention =
     conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
   private val inactiveQueryStatusRetention = 
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
 
+  store.addTrigger(classOf[StreamingQueryData], inactiveQueryStatusRetention) 
{ count =>
+    cleanupInactiveQueries(count)
+  }
+
+  private val queryToProgress = new ConcurrentHashMap[UUID, 
mutable.Queue[String]]()
+
+  private def cleanupInactiveQueries(count: Long): Unit = {
+    val countToDelete = count - inactiveQueryStatusRetention
+    if (countToDelete <= 0) {
+      return
+    }
+
+    val view = 
store.view(classOf[StreamingQueryData]).index("active").first(false).last(false)
+    val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_ => true)
+    val runIds = toDelete.map { e =>
+      store.delete(e.getClass, e.runId)
+      e.runId.toString
+    }
+    // Delete wrappers in one pass, as deleting them for each summary is slow
+    store.removeAllByIndexValues(classOf[StreamingQueryProgressWrapper], 
"runId", runIds)
+  }
+
   override def onQueryStarted(event: 
StreamingQueryListener.QueryStartedEvent): Unit = {
     val startTimestamp = parseProgressTimestamp(event.timestamp)
-    activeQueryStatus.putIfAbsent(event.runId,
-      new StreamingQueryUIData(event.name, event.id, event.runId, 
startTimestamp))
+    store.write(new StreamingQueryData(
+      event.name,
+      event.id,
+      event.runId,
+      Array.empty[String],
+      startTimestamp,
+      true,
+      None
+    ))
   }
 
   override def onQueryProgress(event: 
StreamingQueryListener.QueryProgressEvent): Unit = {
-    val batchTimestamp = parseProgressTimestamp(event.progress.timestamp)
-    val queryStatus = activeQueryStatus.getOrDefault(
-      event.progress.runId,
-      new StreamingQueryUIData(event.progress.name, event.progress.id, 
event.progress.runId,
-        batchTimestamp))
-    queryStatus.updateProcess(event.progress, streamingProgressRetention)
+    val runId = event.progress.runId
+    val batchId = event.progress.batchId
+    val timestamp = event.progress.timestamp
+    if (!queryToProgress.contains(event.progress.runId)) {
+      queryToProgress.put(event.progress.runId, mutable.Queue.empty[String])
+    }
+    val progressIds = queryToProgress.get(event.progress.runId)
+    progressIds.enqueue(getUniqueId(runId, batchId, timestamp))
+    store.write(new StreamingQueryProgressWrapper(event.progress))
+    while(progressIds.length >= streamingProgressRetention) {
+      val uniqueId = progressIds.dequeue
+      store.delete(classOf[StreamingQueryProgressWrapper], uniqueId)
+    }
   }
 
   override def onQueryTerminated(
       event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized 
{
-    val queryStatus = activeQueryStatus.remove(event.runId)
-    if (queryStatus != null) {
-      queryStatus.queryTerminated(event)
-      inactiveQueryStatus += queryStatus
-      while (inactiveQueryStatus.length >= inactiveQueryStatusRetention) {
-        inactiveQueryStatus.dequeue()
-      }
-    }
+    val querySummary = store.read(classOf[StreamingQueryData], event.runId)
+    store.write(new StreamingQueryData(
+      querySummary.name,
+      querySummary.id,
+      querySummary.runId,
+      querySummary.progressIds,
+      querySummary.startTimestamp,
+      false,

Review comment:
       nit: `isActive = false`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
##########
@@ -20,102 +20,145 @@ package org.apache.spark.sql.streaming.ui
 import java.util.UUID
 import java.util.concurrent.ConcurrentHashMap
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable
 
+import com.fasterxml.jackson.annotation.JsonIgnore
+
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.streaming.{StreamingQueryListener, 
StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._
 import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.KVIndex
 
 /**
  * A customized StreamingQueryListener used in structured streaming UI, which 
contains all
  * UI data for both active and inactive query.
- * TODO: Add support for history server.
  */
-private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends 
StreamingQueryListener {
-
-  /**
-   * We use runId as the key here instead of id in active query status map,
-   * because the runId is unique for every started query, even it its a 
restart.
-   */
-  private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID, 
StreamingQueryUIData]()
-  private[ui] val inactiveQueryStatus = new 
mutable.Queue[StreamingQueryUIData]()
+private[sql] class StreamingQueryStatusListener(
+    conf: SparkConf,
+    store: ElementTrackingStore) extends StreamingQueryListener {
 
   private val streamingProgressRetention =
     conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
   private val inactiveQueryStatusRetention = 
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
 
+  store.addTrigger(classOf[StreamingQueryData], inactiveQueryStatusRetention) 
{ count =>
+    cleanupInactiveQueries(count)
+  }
+
+  private val queryToProgress = new ConcurrentHashMap[UUID, 
mutable.Queue[String]]()
+
+  private def cleanupInactiveQueries(count: Long): Unit = {
+    val countToDelete = count - inactiveQueryStatusRetention
+    if (countToDelete <= 0) {
+      return
+    }
+
+    val view = 
store.view(classOf[StreamingQueryData]).index("active").first(false).last(false)
+    val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_ => true)
+    val runIds = toDelete.map { e =>
+      store.delete(e.getClass, e.runId)
+      e.runId.toString
+    }
+    // Delete wrappers in one pass, as deleting them for each summary is slow
+    store.removeAllByIndexValues(classOf[StreamingQueryProgressWrapper], 
"runId", runIds)
+  }
+
   override def onQueryStarted(event: 
StreamingQueryListener.QueryStartedEvent): Unit = {
     val startTimestamp = parseProgressTimestamp(event.timestamp)
-    activeQueryStatus.putIfAbsent(event.runId,
-      new StreamingQueryUIData(event.name, event.id, event.runId, 
startTimestamp))
+    store.write(new StreamingQueryData(
+      event.name,
+      event.id,
+      event.runId,
+      Array.empty[String],
+      startTimestamp,
+      true,
+      None
+    ))
   }
 
   override def onQueryProgress(event: 
StreamingQueryListener.QueryProgressEvent): Unit = {
-    val batchTimestamp = parseProgressTimestamp(event.progress.timestamp)
-    val queryStatus = activeQueryStatus.getOrDefault(
-      event.progress.runId,
-      new StreamingQueryUIData(event.progress.name, event.progress.id, 
event.progress.runId,
-        batchTimestamp))
-    queryStatus.updateProcess(event.progress, streamingProgressRetention)
+    val runId = event.progress.runId
+    val batchId = event.progress.batchId
+    val timestamp = event.progress.timestamp
+    if (!queryToProgress.contains(event.progress.runId)) {

Review comment:
       nit: you can use `putIfAbsent`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to