zsxwing commented on a change in pull request #28781:
URL: https://github.com/apache/spark/pull/28781#discussion_r527140209
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
##########
@@ -20,102 +20,145 @@ package org.apache.spark.sql.streaming.ui
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConverters._
import scala.collection.mutable
+import com.fasterxml.jackson.annotation.JsonIgnore
+
import org.apache.spark.SparkConf
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.streaming.{StreamingQueryListener,
StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._
import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.KVIndex
/**
* A customized StreamingQueryListener used in structured streaming UI, which
contains all
* UI data for both active and inactive query.
- * TODO: Add support for history server.
*/
-private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends
StreamingQueryListener {
-
- /**
- * We use runId as the key here instead of id in active query status map,
- * because the runId is unique for every started query, even it its a
restart.
- */
- private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID,
StreamingQueryUIData]()
- private[ui] val inactiveQueryStatus = new
mutable.Queue[StreamingQueryUIData]()
+private[sql] class StreamingQueryStatusListener(
+ conf: SparkConf,
+ store: ElementTrackingStore) extends StreamingQueryListener {
private val streamingProgressRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
private val inactiveQueryStatusRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
+ store.addTrigger(classOf[StreamingQueryData], inactiveQueryStatusRetention)
{ count =>
+ cleanupInactiveQueries(count)
+ }
+
+ private val queryToProgress = new ConcurrentHashMap[UUID,
mutable.Queue[String]]()
+
+ private def cleanupInactiveQueries(count: Long): Unit = {
+ val countToDelete = count - inactiveQueryStatusRetention
+ if (countToDelete <= 0) {
+ return
+ }
+
+ val view =
store.view(classOf[StreamingQueryData]).index("active").first(false).last(false)
+ val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_ => true)
+ val runIds = toDelete.map { e =>
+ store.delete(e.getClass, e.runId)
+ e.runId.toString
+ }
+ // Delete wrappers in one pass, as deleting them for each summary is slow
+ store.removeAllByIndexValues(classOf[StreamingQueryProgressWrapper],
"runId", runIds)
+ }
+
override def onQueryStarted(event:
StreamingQueryListener.QueryStartedEvent): Unit = {
val startTimestamp = parseProgressTimestamp(event.timestamp)
- activeQueryStatus.putIfAbsent(event.runId,
- new StreamingQueryUIData(event.name, event.id, event.runId,
startTimestamp))
+ store.write(new StreamingQueryData(
+ event.name,
+ event.id,
+ event.runId,
+ Array.empty[String],
+ startTimestamp,
+ true,
+ None
+ ))
}
override def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent): Unit = {
- val batchTimestamp = parseProgressTimestamp(event.progress.timestamp)
- val queryStatus = activeQueryStatus.getOrDefault(
- event.progress.runId,
- new StreamingQueryUIData(event.progress.name, event.progress.id,
event.progress.runId,
- batchTimestamp))
- queryStatus.updateProcess(event.progress, streamingProgressRetention)
+ val runId = event.progress.runId
+ val batchId = event.progress.batchId
+ val timestamp = event.progress.timestamp
+ if (!queryToProgress.contains(event.progress.runId)) {
+ queryToProgress.put(event.progress.runId, mutable.Queue.empty[String])
+ }
+ val progressIds = queryToProgress.get(event.progress.runId)
Review comment:
nit: can we add a comment to explain why we don't need `synchronized`
here, such as
```
Events from the same query run will never be processed concurrently, so it's
safe to access `progressIds` without any protection.
```
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
##########
@@ -31,16 +32,21 @@ import org.apache.spark.util.ListenerBus
* Spark listener bus, so that it can receive
[[StreamingQueryListener.Event]]s and dispatch them
* to StreamingQueryListeners.
*
- * Note that each bus and its registered listeners are associated with a
single SparkSession
+ * Note 1: Each bus and its registered listeners are associated with a single
SparkSession
* and StreamingQueryManager. So this bus will dispatch events to registered
listeners for only
* those queries that were started in the associated SparkSession.
+ *
+ * Note 2: To rebuild Structured Streaming UI in SHS, this bus will be
registered into
+ * [[ReplayListenerBus]]. We use the `live` argument (true in default) to
determine how to process
+ * [[StreamingQueryListener.Event]]. If `live` is false, it means this bus is
used to replay all
+ * streaming query event from eventLog.
*/
-class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
+class StreamingQueryListenerBus(sparkListenerBus: Option[LiveListenerBus],
live: Boolean = true)
Review comment:
nit: We don't need the `live` parameter. We can get `live` by checking
`sparkListenerBus.nonEmpty`.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
##########
@@ -20,102 +20,145 @@ package org.apache.spark.sql.streaming.ui
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConverters._
import scala.collection.mutable
+import com.fasterxml.jackson.annotation.JsonIgnore
+
import org.apache.spark.SparkConf
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.streaming.{StreamingQueryListener,
StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._
import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.KVIndex
/**
* A customized StreamingQueryListener used in structured streaming UI, which
contains all
* UI data for both active and inactive query.
- * TODO: Add support for history server.
*/
-private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends
StreamingQueryListener {
-
- /**
- * We use runId as the key here instead of id in active query status map,
- * because the runId is unique for every started query, even it its a
restart.
- */
- private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID,
StreamingQueryUIData]()
- private[ui] val inactiveQueryStatus = new
mutable.Queue[StreamingQueryUIData]()
+private[sql] class StreamingQueryStatusListener(
+ conf: SparkConf,
+ store: ElementTrackingStore) extends StreamingQueryListener {
private val streamingProgressRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
private val inactiveQueryStatusRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
+ store.addTrigger(classOf[StreamingQueryData], inactiveQueryStatusRetention)
{ count =>
+ cleanupInactiveQueries(count)
+ }
+
+ private val queryToProgress = new ConcurrentHashMap[UUID,
mutable.Queue[String]]()
+
+ private def cleanupInactiveQueries(count: Long): Unit = {
+ val countToDelete = count - inactiveQueryStatusRetention
+ if (countToDelete <= 0) {
+ return
+ }
+
+ val view =
store.view(classOf[StreamingQueryData]).index("active").first(false).last(false)
+ val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_ => true)
+ val runIds = toDelete.map { e =>
+ store.delete(e.getClass, e.runId)
+ e.runId.toString
+ }
+ // Delete wrappers in one pass, as deleting them for each summary is slow
+ store.removeAllByIndexValues(classOf[StreamingQueryProgressWrapper],
"runId", runIds)
+ }
+
override def onQueryStarted(event:
StreamingQueryListener.QueryStartedEvent): Unit = {
val startTimestamp = parseProgressTimestamp(event.timestamp)
- activeQueryStatus.putIfAbsent(event.runId,
- new StreamingQueryUIData(event.name, event.id, event.runId,
startTimestamp))
+ store.write(new StreamingQueryData(
+ event.name,
+ event.id,
+ event.runId,
+ Array.empty[String],
+ startTimestamp,
+ true,
+ None
+ ))
}
override def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent): Unit = {
- val batchTimestamp = parseProgressTimestamp(event.progress.timestamp)
- val queryStatus = activeQueryStatus.getOrDefault(
- event.progress.runId,
- new StreamingQueryUIData(event.progress.name, event.progress.id,
event.progress.runId,
- batchTimestamp))
- queryStatus.updateProcess(event.progress, streamingProgressRetention)
+ val runId = event.progress.runId
+ val batchId = event.progress.batchId
+ val timestamp = event.progress.timestamp
+ if (!queryToProgress.contains(event.progress.runId)) {
+ queryToProgress.put(event.progress.runId, mutable.Queue.empty[String])
+ }
+ val progressIds = queryToProgress.get(event.progress.runId)
+ progressIds.enqueue(getUniqueId(runId, batchId, timestamp))
+ store.write(new StreamingQueryProgressWrapper(event.progress))
+ while(progressIds.length >= streamingProgressRetention) {
+ val uniqueId = progressIds.dequeue
+ store.delete(classOf[StreamingQueryProgressWrapper], uniqueId)
+ }
}
override def onQueryTerminated(
event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized
{
Review comment:
nit: `synchronized` is not needed.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
##########
@@ -20,102 +20,145 @@ package org.apache.spark.sql.streaming.ui
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConverters._
import scala.collection.mutable
+import com.fasterxml.jackson.annotation.JsonIgnore
+
import org.apache.spark.SparkConf
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.streaming.{StreamingQueryListener,
StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._
import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.KVIndex
/**
* A customized StreamingQueryListener used in structured streaming UI, which
contains all
* UI data for both active and inactive query.
- * TODO: Add support for history server.
*/
-private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends
StreamingQueryListener {
-
- /**
- * We use runId as the key here instead of id in active query status map,
- * because the runId is unique for every started query, even it its a
restart.
- */
- private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID,
StreamingQueryUIData]()
- private[ui] val inactiveQueryStatus = new
mutable.Queue[StreamingQueryUIData]()
+private[sql] class StreamingQueryStatusListener(
+ conf: SparkConf,
+ store: ElementTrackingStore) extends StreamingQueryListener {
private val streamingProgressRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
private val inactiveQueryStatusRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
+ store.addTrigger(classOf[StreamingQueryData], inactiveQueryStatusRetention)
{ count =>
+ cleanupInactiveQueries(count)
+ }
+
+ private val queryToProgress = new ConcurrentHashMap[UUID,
mutable.Queue[String]]()
+
+ private def cleanupInactiveQueries(count: Long): Unit = {
+ val countToDelete = count - inactiveQueryStatusRetention
+ if (countToDelete <= 0) {
+ return
+ }
+
+ val view =
store.view(classOf[StreamingQueryData]).index("active").first(false).last(false)
+ val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_ => true)
+ val runIds = toDelete.map { e =>
+ store.delete(e.getClass, e.runId)
+ e.runId.toString
+ }
+ // Delete wrappers in one pass, as deleting them for each summary is slow
+ store.removeAllByIndexValues(classOf[StreamingQueryProgressWrapper],
"runId", runIds)
+ }
+
override def onQueryStarted(event:
StreamingQueryListener.QueryStartedEvent): Unit = {
val startTimestamp = parseProgressTimestamp(event.timestamp)
- activeQueryStatus.putIfAbsent(event.runId,
- new StreamingQueryUIData(event.name, event.id, event.runId,
startTimestamp))
+ store.write(new StreamingQueryData(
+ event.name,
+ event.id,
+ event.runId,
+ Array.empty[String],
+ startTimestamp,
+ true,
Review comment:
nit: `isActive = true`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
##########
@@ -20,102 +20,145 @@ package org.apache.spark.sql.streaming.ui
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConverters._
import scala.collection.mutable
+import com.fasterxml.jackson.annotation.JsonIgnore
+
import org.apache.spark.SparkConf
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.streaming.{StreamingQueryListener,
StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._
import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.KVIndex
/**
* A customized StreamingQueryListener used in structured streaming UI, which
contains all
* UI data for both active and inactive query.
- * TODO: Add support for history server.
*/
-private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends
StreamingQueryListener {
-
- /**
- * We use runId as the key here instead of id in active query status map,
- * because the runId is unique for every started query, even it its a
restart.
- */
- private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID,
StreamingQueryUIData]()
- private[ui] val inactiveQueryStatus = new
mutable.Queue[StreamingQueryUIData]()
+private[sql] class StreamingQueryStatusListener(
+ conf: SparkConf,
+ store: ElementTrackingStore) extends StreamingQueryListener {
private val streamingProgressRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
private val inactiveQueryStatusRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
+ store.addTrigger(classOf[StreamingQueryData], inactiveQueryStatusRetention)
{ count =>
+ cleanupInactiveQueries(count)
+ }
+
+ private val queryToProgress = new ConcurrentHashMap[UUID,
mutable.Queue[String]]()
+
+ private def cleanupInactiveQueries(count: Long): Unit = {
+ val countToDelete = count - inactiveQueryStatusRetention
+ if (countToDelete <= 0) {
+ return
+ }
+
+ val view =
store.view(classOf[StreamingQueryData]).index("active").first(false).last(false)
+ val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_ => true)
+ val runIds = toDelete.map { e =>
+ store.delete(e.getClass, e.runId)
+ e.runId.toString
+ }
+ // Delete wrappers in one pass, as deleting them for each summary is slow
+ store.removeAllByIndexValues(classOf[StreamingQueryProgressWrapper],
"runId", runIds)
+ }
+
override def onQueryStarted(event:
StreamingQueryListener.QueryStartedEvent): Unit = {
val startTimestamp = parseProgressTimestamp(event.timestamp)
- activeQueryStatus.putIfAbsent(event.runId,
- new StreamingQueryUIData(event.name, event.id, event.runId,
startTimestamp))
+ store.write(new StreamingQueryData(
+ event.name,
+ event.id,
+ event.runId,
+ Array.empty[String],
+ startTimestamp,
+ true,
+ None
+ ))
}
override def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent): Unit = {
- val batchTimestamp = parseProgressTimestamp(event.progress.timestamp)
- val queryStatus = activeQueryStatus.getOrDefault(
- event.progress.runId,
- new StreamingQueryUIData(event.progress.name, event.progress.id,
event.progress.runId,
- batchTimestamp))
- queryStatus.updateProcess(event.progress, streamingProgressRetention)
+ val runId = event.progress.runId
+ val batchId = event.progress.batchId
+ val timestamp = event.progress.timestamp
+ if (!queryToProgress.contains(event.progress.runId)) {
+ queryToProgress.put(event.progress.runId, mutable.Queue.empty[String])
+ }
+ val progressIds = queryToProgress.get(event.progress.runId)
+ progressIds.enqueue(getUniqueId(runId, batchId, timestamp))
+ store.write(new StreamingQueryProgressWrapper(event.progress))
+ while(progressIds.length >= streamingProgressRetention) {
+ val uniqueId = progressIds.dequeue
+ store.delete(classOf[StreamingQueryProgressWrapper], uniqueId)
+ }
}
override def onQueryTerminated(
event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized
{
- val queryStatus = activeQueryStatus.remove(event.runId)
- if (queryStatus != null) {
- queryStatus.queryTerminated(event)
- inactiveQueryStatus += queryStatus
- while (inactiveQueryStatus.length >= inactiveQueryStatusRetention) {
- inactiveQueryStatus.dequeue()
- }
- }
+ val querySummary = store.read(classOf[StreamingQueryData], event.runId)
+ store.write(new StreamingQueryData(
+ querySummary.name,
+ querySummary.id,
+ querySummary.runId,
+ querySummary.progressIds,
+ querySummary.startTimestamp,
+ false,
Review comment:
nit: `isActive = false`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
##########
@@ -20,102 +20,145 @@ package org.apache.spark.sql.streaming.ui
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConverters._
import scala.collection.mutable
+import com.fasterxml.jackson.annotation.JsonIgnore
+
import org.apache.spark.SparkConf
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.streaming.{StreamingQueryListener,
StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._
import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.KVIndex
/**
* A customized StreamingQueryListener used in structured streaming UI, which
contains all
* UI data for both active and inactive query.
- * TODO: Add support for history server.
*/
-private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends
StreamingQueryListener {
-
- /**
- * We use runId as the key here instead of id in active query status map,
- * because the runId is unique for every started query, even it its a
restart.
- */
- private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID,
StreamingQueryUIData]()
- private[ui] val inactiveQueryStatus = new
mutable.Queue[StreamingQueryUIData]()
+private[sql] class StreamingQueryStatusListener(
+ conf: SparkConf,
+ store: ElementTrackingStore) extends StreamingQueryListener {
private val streamingProgressRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
private val inactiveQueryStatusRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
+ store.addTrigger(classOf[StreamingQueryData], inactiveQueryStatusRetention)
{ count =>
+ cleanupInactiveQueries(count)
+ }
+
+ private val queryToProgress = new ConcurrentHashMap[UUID,
mutable.Queue[String]]()
+
+ private def cleanupInactiveQueries(count: Long): Unit = {
+ val countToDelete = count - inactiveQueryStatusRetention
+ if (countToDelete <= 0) {
+ return
+ }
+
+ val view =
store.view(classOf[StreamingQueryData]).index("active").first(false).last(false)
+ val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_ => true)
+ val runIds = toDelete.map { e =>
+ store.delete(e.getClass, e.runId)
+ e.runId.toString
+ }
+ // Delete wrappers in one pass, as deleting them for each summary is slow
+ store.removeAllByIndexValues(classOf[StreamingQueryProgressWrapper],
"runId", runIds)
+ }
+
override def onQueryStarted(event:
StreamingQueryListener.QueryStartedEvent): Unit = {
val startTimestamp = parseProgressTimestamp(event.timestamp)
- activeQueryStatus.putIfAbsent(event.runId,
- new StreamingQueryUIData(event.name, event.id, event.runId,
startTimestamp))
+ store.write(new StreamingQueryData(
+ event.name,
+ event.id,
+ event.runId,
+ Array.empty[String],
+ startTimestamp,
+ true,
+ None
+ ))
}
override def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent): Unit = {
- val batchTimestamp = parseProgressTimestamp(event.progress.timestamp)
- val queryStatus = activeQueryStatus.getOrDefault(
- event.progress.runId,
- new StreamingQueryUIData(event.progress.name, event.progress.id,
event.progress.runId,
- batchTimestamp))
- queryStatus.updateProcess(event.progress, streamingProgressRetention)
+ val runId = event.progress.runId
+ val batchId = event.progress.batchId
+ val timestamp = event.progress.timestamp
+ if (!queryToProgress.contains(event.progress.runId)) {
Review comment:
nit: you can use `putIfAbsent`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]