uncleGen commented on a change in pull request #28781:
URL: https://github.com/apache/spark/pull/28781#discussion_r533229138
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
##########
@@ -20,102 +20,144 @@ package org.apache.spark.sql.streaming.ui
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConverters._
import scala.collection.mutable
+import com.fasterxml.jackson.annotation.JsonIgnore
+
import org.apache.spark.SparkConf
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.streaming.{StreamingQueryListener,
StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._
import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.KVIndex
/**
* A customized StreamingQueryListener used in structured streaming UI, which
contains all
* UI data for both active and inactive query.
- * TODO: Add support for history server.
*/
-private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends
StreamingQueryListener {
-
- /**
- * We use runId as the key here instead of id in active query status map,
- * because the runId is unique for every started query, even it its a
restart.
- */
- private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID,
StreamingQueryUIData]()
- private[ui] val inactiveQueryStatus = new
mutable.Queue[StreamingQueryUIData]()
+private[sql] class StreamingQueryStatusListener(
+ conf: SparkConf,
+ store: ElementTrackingStore) extends StreamingQueryListener {
private val streamingProgressRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
private val inactiveQueryStatusRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
+ store.addTrigger(classOf[StreamingQueryData], inactiveQueryStatusRetention)
{ count =>
+ cleanupInactiveQueries(count)
+ }
+
+ // Events from the same query run will never be processed concurrently, so
it's safe to
+ // access `progressIds` without any protection.
+ private val queryToProgress = new ConcurrentHashMap[UUID,
mutable.Queue[String]]()
+
+ private def cleanupInactiveQueries(count: Long): Unit = {
+ val view =
store.view(classOf[StreamingQueryData]).index("active").first(false).last(false)
+ val inactiveQueries = KVUtils.viewToSeq(view, Int.MaxValue)(_ => true)
+ val numInactiveQueries = inactiveQueries.size
+ if (numInactiveQueries <= inactiveQueryStatusRetention) {
+ return
+ }
+ val toDelete = inactiveQueries.sortBy(_.endTimestamp.get)
+ .take(numInactiveQueries - inactiveQueryStatusRetention)
+ val runIds = toDelete.map { e =>
+ store.delete(e.getClass, e.runId)
+ e.runId.toString
+ }
+ // Delete wrappers in one pass, as deleting them for each summary is slow
+ store.removeAllByIndexValues(classOf[StreamingQueryProgressWrapper],
"runId", runIds)
+ }
+
override def onQueryStarted(event:
StreamingQueryListener.QueryStartedEvent): Unit = {
val startTimestamp = parseProgressTimestamp(event.timestamp)
- activeQueryStatus.putIfAbsent(event.runId,
- new StreamingQueryUIData(event.name, event.id, event.runId,
startTimestamp))
+ store.write(new StreamingQueryData(
+ event.name,
+ event.id,
+ event.runId,
+ isActive = true,
+ None,
+ startTimestamp
+ ), checkTriggers = true)
}
override def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent): Unit = {
- val batchTimestamp = parseProgressTimestamp(event.progress.timestamp)
- val queryStatus = activeQueryStatus.getOrDefault(
- event.progress.runId,
- new StreamingQueryUIData(event.progress.name, event.progress.id,
event.progress.runId,
- batchTimestamp))
- queryStatus.updateProcess(event.progress, streamingProgressRetention)
- }
-
- override def onQueryTerminated(
- event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized
{
- val queryStatus = activeQueryStatus.remove(event.runId)
- if (queryStatus != null) {
- queryStatus.queryTerminated(event)
- inactiveQueryStatus += queryStatus
- while (inactiveQueryStatus.length >= inactiveQueryStatusRetention) {
- inactiveQueryStatus.dequeue()
- }
+ val runId = event.progress.runId
+ val batchId = event.progress.batchId
+ val timestamp = event.progress.timestamp
+ if (!queryToProgress.containsKey(runId)) {
+ queryToProgress.put(runId, mutable.Queue.empty[String])
+ }
+ val progressIds = queryToProgress.get(runId)
+ progressIds.enqueue(getUniqueId(runId, batchId, timestamp))
+ store.write(new StreamingQueryProgressWrapper(event.progress))
+ while(progressIds.length > streamingProgressRetention) {
Review comment:
logic update: use `>` instead of `>=`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryStatusStore.scala
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.ui
+
+import java.util.UUID
+
+import org.apache.spark.sql.streaming.ui.{StreamingQueryData,
StreamingQueryProgressWrapper, StreamingQueryUIData}
+import org.apache.spark.status.KVUtils
+import org.apache.spark.util.kvstore.KVStore
+
+/**
+ * Provides a view of a KVStore with methods that make it easy to query
Streaming Query state.
+ * There's no state kept in this class, so it's ok to have multiple instances
of it in an
+ * application.
+ */
+class StreamingQueryStatusStore(store: KVStore) {
+
+ def allQueryUIData: Seq[StreamingQueryUIData] = {
+ val view =
store.view(classOf[StreamingQueryData]).index("startTimestamp").first(0L)
+ KVUtils.viewToSeq(view, Int.MaxValue)(_ => true).map(makeUIData)
+ }
+
+ // visible for test
+ private[sql] def getQueryProgressData(runId: UUID):
Seq[StreamingQueryProgressWrapper] = {
+ val view = store.view(classOf[StreamingQueryProgressWrapper])
+ .index("runId").first(runId.toString).last(runId.toString)
+ KVUtils.viewToSeq(view, Int.MaxValue)(_ => true)
+ }
+
+ private def makeUIData(summary: StreamingQueryData): StreamingQueryUIData = {
+ val runId = summary.runId.toString
+ val view = store.view(classOf[StreamingQueryProgressWrapper])
+ .index("runId").first(runId).last(runId)
+ val recentProgress = KVUtils.viewToSeq(view, Int.MaxValue)(_ => true)
+ .map(_.progress).sortBy(_.timestamp).toArray
+ StreamingQueryUIData(summary, recentProgress)
Review comment:
bugfix update: get `StreamingQueryProgressWrapper` from KVstore
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
##########
@@ -94,11 +107,112 @@ class StreamingQueryStatusListenerSuite extends
StreamTest {
listener.onQueryStarted(startEvent1)
// result checking
- assert(listener.activeQueryStatus.size() == 1)
- assert(listener.inactiveQueryStatus.length == 1)
- assert(listener.activeQueryStatus.containsKey(runId1))
- assert(listener.activeQueryStatus.get(runId1).id == id)
- assert(listener.inactiveQueryStatus.head.runId == runId0)
- assert(listener.inactiveQueryStatus.head.id == id)
+ assert(queryStore.allQueryUIData.count(_.summary.isActive) == 1)
+ assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).length == 1)
+
assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(_.summary.runId
== runId1))
+ assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(uiData
=>
+ uiData.summary.runId == runId1 && uiData.summary.id == id))
+
assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.runId
== runId0)
+
assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.id
== id)
+ }
+
+ test("test small retained queries") {
Review comment:
add new ut
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
##########
@@ -20,102 +20,144 @@ package org.apache.spark.sql.streaming.ui
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConverters._
import scala.collection.mutable
+import com.fasterxml.jackson.annotation.JsonIgnore
+
import org.apache.spark.SparkConf
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.streaming.{StreamingQueryListener,
StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._
import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.KVIndex
/**
* A customized StreamingQueryListener used in structured streaming UI, which
contains all
* UI data for both active and inactive query.
- * TODO: Add support for history server.
*/
-private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends
StreamingQueryListener {
-
- /**
- * We use runId as the key here instead of id in active query status map,
- * because the runId is unique for every started query, even it its a
restart.
- */
- private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID,
StreamingQueryUIData]()
- private[ui] val inactiveQueryStatus = new
mutable.Queue[StreamingQueryUIData]()
+private[sql] class StreamingQueryStatusListener(
+ conf: SparkConf,
+ store: ElementTrackingStore) extends StreamingQueryListener {
private val streamingProgressRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
private val inactiveQueryStatusRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
+ store.addTrigger(classOf[StreamingQueryData], inactiveQueryStatusRetention)
{ count =>
+ cleanupInactiveQueries(count)
+ }
+
+ // Events from the same query run will never be processed concurrently, so
it's safe to
+ // access `progressIds` without any protection.
+ private val queryToProgress = new ConcurrentHashMap[UUID,
mutable.Queue[String]]()
+
+ private def cleanupInactiveQueries(count: Long): Unit = {
+ val view =
store.view(classOf[StreamingQueryData]).index("active").first(false).last(false)
+ val inactiveQueries = KVUtils.viewToSeq(view, Int.MaxValue)(_ => true)
+ val numInactiveQueries = inactiveQueries.size
+ if (numInactiveQueries <= inactiveQueryStatusRetention) {
+ return
+ }
Review comment:
logic update: use `STREAMING_UI_RETAINED_QUERIES ` to clean inactive
query.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
##########
@@ -20,102 +20,144 @@ package org.apache.spark.sql.streaming.ui
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConverters._
import scala.collection.mutable
+import com.fasterxml.jackson.annotation.JsonIgnore
+
import org.apache.spark.SparkConf
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.streaming.{StreamingQueryListener,
StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._
import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.KVIndex
/**
* A customized StreamingQueryListener used in structured streaming UI, which
contains all
* UI data for both active and inactive query.
- * TODO: Add support for history server.
*/
-private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends
StreamingQueryListener {
-
- /**
- * We use runId as the key here instead of id in active query status map,
- * because the runId is unique for every started query, even it its a
restart.
- */
- private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID,
StreamingQueryUIData]()
- private[ui] val inactiveQueryStatus = new
mutable.Queue[StreamingQueryUIData]()
+private[sql] class StreamingQueryStatusListener(
+ conf: SparkConf,
+ store: ElementTrackingStore) extends StreamingQueryListener {
private val streamingProgressRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
private val inactiveQueryStatusRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
+ store.addTrigger(classOf[StreamingQueryData], inactiveQueryStatusRetention)
{ count =>
+ cleanupInactiveQueries(count)
+ }
+
+ // Events from the same query run will never be processed concurrently, so
it's safe to
+ // access `progressIds` without any protection.
+ private val queryToProgress = new ConcurrentHashMap[UUID,
mutable.Queue[String]]()
+
+ private def cleanupInactiveQueries(count: Long): Unit = {
+ val view =
store.view(classOf[StreamingQueryData]).index("active").first(false).last(false)
+ val inactiveQueries = KVUtils.viewToSeq(view, Int.MaxValue)(_ => true)
+ val numInactiveQueries = inactiveQueries.size
+ if (numInactiveQueries <= inactiveQueryStatusRetention) {
+ return
+ }
+ val toDelete = inactiveQueries.sortBy(_.endTimestamp.get)
+ .take(numInactiveQueries - inactiveQueryStatusRetention)
+ val runIds = toDelete.map { e =>
+ store.delete(e.getClass, e.runId)
+ e.runId.toString
+ }
+ // Delete wrappers in one pass, as deleting them for each summary is slow
+ store.removeAllByIndexValues(classOf[StreamingQueryProgressWrapper],
"runId", runIds)
+ }
+
override def onQueryStarted(event:
StreamingQueryListener.QueryStartedEvent): Unit = {
val startTimestamp = parseProgressTimestamp(event.timestamp)
- activeQueryStatus.putIfAbsent(event.runId,
- new StreamingQueryUIData(event.name, event.id, event.runId,
startTimestamp))
+ store.write(new StreamingQueryData(
+ event.name,
+ event.id,
+ event.runId,
+ isActive = true,
+ None,
+ startTimestamp
+ ), checkTriggers = true)
}
override def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent): Unit = {
- val batchTimestamp = parseProgressTimestamp(event.progress.timestamp)
- val queryStatus = activeQueryStatus.getOrDefault(
- event.progress.runId,
- new StreamingQueryUIData(event.progress.name, event.progress.id,
event.progress.runId,
- batchTimestamp))
- queryStatus.updateProcess(event.progress, streamingProgressRetention)
- }
-
- override def onQueryTerminated(
- event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized
{
- val queryStatus = activeQueryStatus.remove(event.runId)
- if (queryStatus != null) {
- queryStatus.queryTerminated(event)
- inactiveQueryStatus += queryStatus
- while (inactiveQueryStatus.length >= inactiveQueryStatusRetention) {
- inactiveQueryStatus.dequeue()
- }
+ val runId = event.progress.runId
+ val batchId = event.progress.batchId
+ val timestamp = event.progress.timestamp
+ if (!queryToProgress.containsKey(runId)) {
+ queryToProgress.put(runId, mutable.Queue.empty[String])
+ }
+ val progressIds = queryToProgress.get(runId)
+ progressIds.enqueue(getUniqueId(runId, batchId, timestamp))
+ store.write(new StreamingQueryProgressWrapper(event.progress))
+ while(progressIds.length > streamingProgressRetention) {
+ val uniqueId = progressIds.dequeue
+ store.delete(classOf[StreamingQueryProgressWrapper], uniqueId)
}
}
- def allQueryStatus: Seq[StreamingQueryUIData] = synchronized {
- activeQueryStatus.values().asScala.toSeq ++ inactiveQueryStatus
+ override def onQueryTerminated(
+ event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
+ val querySummary = store.read(classOf[StreamingQueryData], event.runId)
+ val curTime = System.currentTimeMillis()
+ store.write(new StreamingQueryData(
+ querySummary.name,
+ querySummary.id,
+ querySummary.runId,
+ isActive = false,
+ querySummary.exception,
+ querySummary.startTimestamp,
+ Some(curTime)
+ ), checkTriggers = true)
+ queryToProgress.remove(event.runId)
}
}
+private[sql] class StreamingQueryData(
+ val name: String,
+ val id: UUID,
+ @KVIndexParam val runId: UUID,
+ @KVIndexParam("active") val isActive: Boolean,
+ val exception: Option[String],
+ @KVIndexParam("startTimestamp") val startTimestamp: Long,
+ val endTimestamp: Option[Long] = None)
Review comment:
adding `endTimestamp` to help to clean inactive queries.
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
##########
@@ -94,11 +107,112 @@ class StreamingQueryStatusListenerSuite extends
StreamTest {
listener.onQueryStarted(startEvent1)
// result checking
- assert(listener.activeQueryStatus.size() == 1)
- assert(listener.inactiveQueryStatus.length == 1)
- assert(listener.activeQueryStatus.containsKey(runId1))
- assert(listener.activeQueryStatus.get(runId1).id == id)
- assert(listener.inactiveQueryStatus.head.runId == runId0)
- assert(listener.inactiveQueryStatus.head.id == id)
+ assert(queryStore.allQueryUIData.count(_.summary.isActive) == 1)
+ assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).length == 1)
+
assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(_.summary.runId
== runId1))
+ assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(uiData
=>
+ uiData.summary.runId == runId1 && uiData.summary.id == id))
+
assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.runId
== runId0)
+
assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.id
== id)
+ }
+
+ test("test small retained queries") {
+ val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf)
+ val conf = spark.sparkContext.conf
+ conf.set(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES.key, "2")
+ val listener = new StreamingQueryStatusListener(conf, kvStore)
+ val queryStore = new StreamingQueryStatusStore(kvStore)
+
+ def addNewQuery(): (UUID, UUID) = {
+ val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") //
ISO8601
+ format.setTimeZone(getTimeZone("UTC"))
+ val id = UUID.randomUUID()
+ val runId = UUID.randomUUID()
+ val startEvent = new StreamingQueryListener.QueryStartedEvent(
+ id, runId, "test1", format.format(new
Date(System.currentTimeMillis())))
+ listener.onQueryStarted(startEvent)
+ (id, runId)
+ }
+
+ val (id1, runId1) = addNewQuery()
+ val (id2, runId2) = addNewQuery()
+ val (id3, runId3) = addNewQuery()
+
+ assert(queryStore.allQueryUIData.count(!_.summary.isActive) == 0)
+
+ val terminateEvent1 = new StreamingQueryListener.QueryTerminatedEvent(id1,
runId1, None)
+ listener.onQueryTerminated(terminateEvent1)
+ // sleep 100 mills to make sure clean work complete
+ Thread.sleep(100)
+ assert(queryStore.allQueryUIData.count(!_.summary.isActive) == 1)
+ var inactiveQueries =
queryStore.allQueryUIData.filter(!_.summary.isActive).map(_.summary.id)
+ assert(inactiveQueries == Seq(id1))
+
+ val terminateEvent2 = new StreamingQueryListener.QueryTerminatedEvent(id2,
runId2, None)
+ listener.onQueryTerminated(terminateEvent2)
+ // sleep 100 mills to make sure clean work complete
+ Thread.sleep(100)
+ assert(queryStore.allQueryUIData.count(!_.summary.isActive) == 2)
+ inactiveQueries =
queryStore.allQueryUIData.filter(!_.summary.isActive).map(_.summary.id)
+ assert(inactiveQueries == Seq(id1, id2))
+
+ val terminateEvent3 = new StreamingQueryListener.QueryTerminatedEvent(id3,
runId3, None)
+ listener.onQueryTerminated(terminateEvent3)
+ // sleep 100 mills to make sure clean work complete
+ Thread.sleep(100)
+ assert(queryStore.allQueryUIData.count(!_.summary.isActive) == 2)
+ inactiveQueries =
queryStore.allQueryUIData.filter(!_.summary.isActive).map(_.summary.id)
+ assert(inactiveQueries == Seq(id2, id3))
+ }
+
+ test("test small retained progress") {
Review comment:
add new ut
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
##########
@@ -20,102 +20,144 @@ package org.apache.spark.sql.streaming.ui
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConverters._
import scala.collection.mutable
+import com.fasterxml.jackson.annotation.JsonIgnore
+
import org.apache.spark.SparkConf
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.streaming.{StreamingQueryListener,
StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._
import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.KVIndex
/**
* A customized StreamingQueryListener used in structured streaming UI, which
contains all
* UI data for both active and inactive query.
- * TODO: Add support for history server.
*/
-private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends
StreamingQueryListener {
-
- /**
- * We use runId as the key here instead of id in active query status map,
- * because the runId is unique for every started query, even it its a
restart.
- */
- private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID,
StreamingQueryUIData]()
- private[ui] val inactiveQueryStatus = new
mutable.Queue[StreamingQueryUIData]()
+private[sql] class StreamingQueryStatusListener(
+ conf: SparkConf,
+ store: ElementTrackingStore) extends StreamingQueryListener {
private val streamingProgressRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
private val inactiveQueryStatusRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
+ store.addTrigger(classOf[StreamingQueryData], inactiveQueryStatusRetention)
{ count =>
+ cleanupInactiveQueries(count)
+ }
+
+ // Events from the same query run will never be processed concurrently, so
it's safe to
+ // access `progressIds` without any protection.
+ private val queryToProgress = new ConcurrentHashMap[UUID,
mutable.Queue[String]]()
+
+ private def cleanupInactiveQueries(count: Long): Unit = {
+ val view =
store.view(classOf[StreamingQueryData]).index("active").first(false).last(false)
+ val inactiveQueries = KVUtils.viewToSeq(view, Int.MaxValue)(_ => true)
+ val numInactiveQueries = inactiveQueries.size
+ if (numInactiveQueries <= inactiveQueryStatusRetention) {
+ return
+ }
+ val toDelete = inactiveQueries.sortBy(_.endTimestamp.get)
+ .take(numInactiveQueries - inactiveQueryStatusRetention)
Review comment:
add new logic: clean earliest inactive query first.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]