Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/19751#discussion_r157639162
--- Diff:
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -68,10 +69,25 @@ private[spark] class AppStatusListener(
private val liveTasks = new HashMap[Long, LiveTask]()
private val liveRDDs = new HashMap[Int, LiveRDD]()
private val pools = new HashMap[String, SchedulerPool]()
+ // Keep the active executor count as a separate variable to avoid having
to do synchronization
+ // around liveExecutors.
+ @volatile private var activeExecutorCount = 0
- override def onOtherEvent(event: SparkListenerEvent): Unit = event match
{
- case SparkListenerLogStart(version) => sparkVersion = version
- case _ =>
+ kvstore.addTrigger(classOf[ExecutorSummaryWrapper],
conf.get(MAX_RETAINED_DEAD_EXECUTORS))
+ { count => cleanupExecutors(count) }
+
+ kvstore.addTrigger(classOf[JobDataWrapper], conf.get(MAX_RETAINED_JOBS))
{ count =>
+ cleanupJobs(count)
+ }
+
+ kvstore.addTrigger(classOf[StageDataWrapper],
conf.get(MAX_RETAINED_STAGES)) { count =>
+ cleanupStages(count)
+ }
+
+ kvstore.onFlush {
+ if (!live) {
+ flush()
--- End diff --
hm, why only flush for history server?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]