Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/19751#discussion_r156811638
--- Diff:
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala ---
@@ -851,6 +842,97 @@ class AppStatusListenerSuite extends SparkFunSuite
with BeforeAndAfter {
}
}
+ test("eviction of old data") {
+ val testConf = conf.clone()
+ .set(MAX_RETAINED_JOBS, 2)
+ .set(MAX_RETAINED_STAGES, 2)
+ .set(MAX_RETAINED_TASKS_PER_STAGE, 2)
+ .set(MAX_RETAINED_DEAD_EXECUTORS, 1)
+ val listener = new AppStatusListener(store, testConf, true)
+
+ // Start 3 jobs, all should be kept. Stop one, it should be evicted.
+ time += 1
+ listener.onJobStart(SparkListenerJobStart(1, time, Nil, null))
+ listener.onJobStart(SparkListenerJobStart(2, time, Nil, null))
+ listener.onJobStart(SparkListenerJobStart(3, time, Nil, null))
+ assert(store.count(classOf[JobDataWrapper]) === 3)
+
+ time += 1
+ listener.onJobEnd(SparkListenerJobEnd(2, time, JobSucceeded))
+ assert(store.count(classOf[JobDataWrapper]) === 2)
+ intercept[NoSuchElementException] {
+ store.read(classOf[JobDataWrapper], 2)
+ }
+
+ // Start 3 stages, all should be kept. Stop 2 of them, the oldest
stopped one should be
+ // deleted. Start a new attempt of the second stopped one, and verify
that the stage graph
--- End diff --
there is no DAG here, the test controls what "oldest" means. In this case
"oldest" = "first stage in the list", which is also "smallest id", which is the
actual behavior of the listener.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]