uncleGen commented on a change in pull request #28781:
URL: https://github.com/apache/spark/pull/28781#discussion_r509034217
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
##########
@@ -18,104 +18,139 @@
package org.apache.spark.sql.streaming.ui
import java.util.UUID
-import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConverters._
import scala.collection.mutable
+import com.fasterxml.jackson.annotation.JsonIgnore
+
import org.apache.spark.SparkConf
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.streaming.{StreamingQueryListener,
StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._
import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.KVIndex
/**
* A customized StreamingQueryListener used in structured streaming UI, which
contains all
* UI data for both active and inactive query.
- * TODO: Add support for history server.
*/
-private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends
StreamingQueryListener {
-
- /**
- * We use runId as the key here instead of id in active query status map,
- * because the runId is unique for every started query, even it its a
restart.
- */
- private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID,
StreamingQueryUIData]()
- private[ui] val inactiveQueryStatus = new
mutable.Queue[StreamingQueryUIData]()
+private[sql] class StreamingQueryStatusListener(
+ conf: SparkConf,
+ store: ElementTrackingStore) extends StreamingQueryListener {
private val streamingProgressRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
private val inactiveQueryStatusRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
+ store.addTrigger(classOf[StreamingQuerySummary],
inactiveQueryStatusRetention) { count =>
+ cleanupInactiveQueries(count)
+ }
+
+ private val queryToProgress = new mutable.HashMap[UUID,
mutable.Queue[String]]()
+
+ private def cleanupInactiveQueries(count: Long): Unit = {
+ val countToDelete = count - inactiveQueryStatusRetention
+ if (countToDelete <= 0) {
+ return
+ }
+
+ val view =
store.view(classOf[StreamingQuerySummary]).index("active").first(false).last(false)
+ val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_ => true)
+ toDelete.foreach { e =>
+ store.delete(e.getClass, e.runId)
+ store.removeAllByIndexValues(
+ classOf[StreamingQueryProgressWrapper], "runId", e.runId.toString)
+ }
+ }
+
override def onQueryStarted(event:
StreamingQueryListener.QueryStartedEvent): Unit = {
val startTimestamp = parseProgressTimestamp(event.timestamp)
- activeQueryStatus.putIfAbsent(event.runId,
- new StreamingQueryUIData(event.name, event.id, event.runId,
startTimestamp))
+ store.write(new StreamingQuerySummary(
+ event.name,
+ event.id,
+ event.runId,
+ Array.empty[String],
+ startTimestamp,
+ true,
+ None
+ ))
}
override def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent): Unit = {
- val batchTimestamp = parseProgressTimestamp(event.progress.timestamp)
- val queryStatus = activeQueryStatus.getOrDefault(
- event.progress.runId,
- new StreamingQueryUIData(event.progress.name, event.progress.id,
event.progress.runId,
- batchTimestamp))
- queryStatus.updateProcess(event.progress, streamingProgressRetention)
+ val runId = event.progress.runId
+ val batchId = event.progress.batchId
+ val timestamp = event.progress.timestamp
+ if (!queryToProgress.contains(event.progress.runId)) {
Review comment:
+1
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
##########
@@ -18,104 +18,139 @@
package org.apache.spark.sql.streaming.ui
import java.util.UUID
-import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConverters._
import scala.collection.mutable
+import com.fasterxml.jackson.annotation.JsonIgnore
+
import org.apache.spark.SparkConf
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.streaming.{StreamingQueryListener,
StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._
import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.KVIndex
/**
* A customized StreamingQueryListener used in structured streaming UI, which
contains all
* UI data for both active and inactive query.
- * TODO: Add support for history server.
*/
-private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends
StreamingQueryListener {
-
- /**
- * We use runId as the key here instead of id in active query status map,
- * because the runId is unique for every started query, even it its a
restart.
- */
- private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID,
StreamingQueryUIData]()
- private[ui] val inactiveQueryStatus = new
mutable.Queue[StreamingQueryUIData]()
+private[sql] class StreamingQueryStatusListener(
+ conf: SparkConf,
+ store: ElementTrackingStore) extends StreamingQueryListener {
private val streamingProgressRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
private val inactiveQueryStatusRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
+ store.addTrigger(classOf[StreamingQuerySummary],
inactiveQueryStatusRetention) { count =>
+ cleanupInactiveQueries(count)
+ }
+
+ private val queryToProgress = new mutable.HashMap[UUID,
mutable.Queue[String]]()
+
+ private def cleanupInactiveQueries(count: Long): Unit = {
+ val countToDelete = count - inactiveQueryStatusRetention
+ if (countToDelete <= 0) {
+ return
+ }
+
+ val view =
store.view(classOf[StreamingQuerySummary]).index("active").first(false).last(false)
+ val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_ => true)
+ toDelete.foreach { e =>
+ store.delete(e.getClass, e.runId)
+ store.removeAllByIndexValues(
+ classOf[StreamingQueryProgressWrapper], "runId", e.runId.toString)
+ }
+ }
+
override def onQueryStarted(event:
StreamingQueryListener.QueryStartedEvent): Unit = {
val startTimestamp = parseProgressTimestamp(event.timestamp)
- activeQueryStatus.putIfAbsent(event.runId,
- new StreamingQueryUIData(event.name, event.id, event.runId,
startTimestamp))
+ store.write(new StreamingQuerySummary(
+ event.name,
+ event.id,
+ event.runId,
+ Array.empty[String],
+ startTimestamp,
+ true,
+ None
+ ))
}
override def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent): Unit = {
- val batchTimestamp = parseProgressTimestamp(event.progress.timestamp)
- val queryStatus = activeQueryStatus.getOrDefault(
- event.progress.runId,
- new StreamingQueryUIData(event.progress.name, event.progress.id,
event.progress.runId,
- batchTimestamp))
- queryStatus.updateProcess(event.progress, streamingProgressRetention)
+ val runId = event.progress.runId
+ val batchId = event.progress.batchId
+ val timestamp = event.progress.timestamp
+ if (!queryToProgress.contains(event.progress.runId)) {
+ queryToProgress.put(event.progress.runId, mutable.Queue.empty[String])
+ }
+ val progressIds = queryToProgress(event.progress.runId)
+ progressIds.enqueue(getUniqueId(runId, batchId, timestamp))
+ store.write(new StreamingQueryProgressWrapper(event.progress))
+ while(progressIds.length >= streamingProgressRetention) {
+ val uniqueId = progressIds.dequeue
+ store.delete(classOf[StreamingQueryProgressWrapper], uniqueId)
+ }
}
override def onQueryTerminated(
event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized
{
- val queryStatus = activeQueryStatus.remove(event.runId)
- if (queryStatus != null) {
- queryStatus.queryTerminated(event)
- inactiveQueryStatus += queryStatus
- while (inactiveQueryStatus.length >= inactiveQueryStatusRetention) {
- inactiveQueryStatus.dequeue()
- }
- }
+ val querySummary = store.read(classOf[StreamingQuerySummary], event.runId)
+ store.write(new StreamingQuerySummary(
+ querySummary.name,
+ querySummary.id,
+ querySummary.runId,
+ querySummary.progressIds,
+ querySummary.startTimestamp,
+ false,
+ querySummary.exception
+ ))
+ queryToProgress.remove(event.runId)
}
+}
- def allQueryStatus: Seq[StreamingQueryUIData] = synchronized {
- activeQueryStatus.values().asScala.toSeq ++ inactiveQueryStatus
- }
+private[sql] class StreamingQuerySummary(
+ val name: String,
+ val id: UUID,
+ @KVIndexParam val runId: UUID,
+ val progressIds: Array[String],
+ val startTimestamp: Long,
+ val isActive: Boolean,
+ val exception: Option[String]) {
+ @JsonIgnore @KVIndex("active")
+ private def activeIndex: Boolean = isActive
+ @JsonIgnore @KVIndex("startTimestamp")
+ private def startTimestampIndex: Long = startTimestamp
}
/**
* This class contains all message related to UI display, each instance
corresponds to a single
* [[org.apache.spark.sql.streaming.StreamingQuery]].
*/
-private[ui] class StreamingQueryUIData(
- val name: String,
- val id: UUID,
- val runId: UUID,
- val startTimestamp: Long) {
-
- /** Holds the most recent query progress updates. */
- private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
-
- private var _isActive = true
- private var _exception: Option[String] = None
-
- def isActive: Boolean = synchronized { _isActive }
-
- def exception: Option[String] = synchronized { _exception }
-
- def queryTerminated(event: StreamingQueryListener.QueryTerminatedEvent):
Unit = synchronized {
- _isActive = false
- _exception = event.exception
- }
-
- def updateProcess(
- newProgress: StreamingQueryProgress, retentionNum: Int): Unit =
progressBuffer.synchronized {
- progressBuffer += newProgress
- while (progressBuffer.length >= retentionNum) {
- progressBuffer.dequeue()
- }
- }
-
- def recentProgress: Array[StreamingQueryProgress] =
progressBuffer.synchronized {
- progressBuffer.toArray
- }
+private[sql] case class StreamingQueryUIData(
+ summary: StreamingQuerySummary,
+ recentProgress: Array[StreamingQueryProgress],
+ lastProgress: StreamingQueryProgress)
Review comment:
+1
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
##########
@@ -18,104 +18,139 @@
package org.apache.spark.sql.streaming.ui
import java.util.UUID
-import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConverters._
import scala.collection.mutable
+import com.fasterxml.jackson.annotation.JsonIgnore
+
import org.apache.spark.SparkConf
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.streaming.{StreamingQueryListener,
StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._
import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.KVIndex
/**
* A customized StreamingQueryListener used in structured streaming UI, which
contains all
* UI data for both active and inactive query.
- * TODO: Add support for history server.
*/
-private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends
StreamingQueryListener {
-
- /**
- * We use runId as the key here instead of id in active query status map,
- * because the runId is unique for every started query, even it its a
restart.
- */
- private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID,
StreamingQueryUIData]()
- private[ui] val inactiveQueryStatus = new
mutable.Queue[StreamingQueryUIData]()
+private[sql] class StreamingQueryStatusListener(
+ conf: SparkConf,
+ store: ElementTrackingStore) extends StreamingQueryListener {
private val streamingProgressRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
private val inactiveQueryStatusRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
+ store.addTrigger(classOf[StreamingQuerySummary],
inactiveQueryStatusRetention) { count =>
+ cleanupInactiveQueries(count)
+ }
+
+ private val queryToProgress = new mutable.HashMap[UUID,
mutable.Queue[String]]()
+
+ private def cleanupInactiveQueries(count: Long): Unit = {
+ val countToDelete = count - inactiveQueryStatusRetention
+ if (countToDelete <= 0) {
+ return
+ }
+
+ val view =
store.view(classOf[StreamingQuerySummary]).index("active").first(false).last(false)
+ val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_ => true)
+ toDelete.foreach { e =>
+ store.delete(e.getClass, e.runId)
+ store.removeAllByIndexValues(
+ classOf[StreamingQueryProgressWrapper], "runId", e.runId.toString)
+ }
+ }
+
override def onQueryStarted(event:
StreamingQueryListener.QueryStartedEvent): Unit = {
val startTimestamp = parseProgressTimestamp(event.timestamp)
- activeQueryStatus.putIfAbsent(event.runId,
- new StreamingQueryUIData(event.name, event.id, event.runId,
startTimestamp))
+ store.write(new StreamingQuerySummary(
+ event.name,
+ event.id,
+ event.runId,
+ Array.empty[String],
+ startTimestamp,
+ true,
+ None
+ ))
}
override def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent): Unit = {
- val batchTimestamp = parseProgressTimestamp(event.progress.timestamp)
- val queryStatus = activeQueryStatus.getOrDefault(
- event.progress.runId,
- new StreamingQueryUIData(event.progress.name, event.progress.id,
event.progress.runId,
- batchTimestamp))
- queryStatus.updateProcess(event.progress, streamingProgressRetention)
+ val runId = event.progress.runId
+ val batchId = event.progress.batchId
+ val timestamp = event.progress.timestamp
+ if (!queryToProgress.contains(event.progress.runId)) {
+ queryToProgress.put(event.progress.runId, mutable.Queue.empty[String])
+ }
+ val progressIds = queryToProgress(event.progress.runId)
+ progressIds.enqueue(getUniqueId(runId, batchId, timestamp))
+ store.write(new StreamingQueryProgressWrapper(event.progress))
+ while(progressIds.length >= streamingProgressRetention) {
+ val uniqueId = progressIds.dequeue
+ store.delete(classOf[StreamingQueryProgressWrapper], uniqueId)
+ }
}
override def onQueryTerminated(
event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized
{
- val queryStatus = activeQueryStatus.remove(event.runId)
- if (queryStatus != null) {
- queryStatus.queryTerminated(event)
- inactiveQueryStatus += queryStatus
- while (inactiveQueryStatus.length >= inactiveQueryStatusRetention) {
- inactiveQueryStatus.dequeue()
- }
- }
+ val querySummary = store.read(classOf[StreamingQuerySummary], event.runId)
+ store.write(new StreamingQuerySummary(
+ querySummary.name,
+ querySummary.id,
+ querySummary.runId,
+ querySummary.progressIds,
+ querySummary.startTimestamp,
+ false,
+ querySummary.exception
+ ))
+ queryToProgress.remove(event.runId)
}
+}
- def allQueryStatus: Seq[StreamingQueryUIData] = synchronized {
- activeQueryStatus.values().asScala.toSeq ++ inactiveQueryStatus
- }
+private[sql] class StreamingQuerySummary(
Review comment:
ok
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]