Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/19211#discussion_r139594359
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---
@@ -42,59 +44,65 @@ class SparkListenerSuite extends SparkFunSuite with
LocalSparkContext with Match
private val mockSparkContext: SparkContext =
Mockito.mock(classOf[SparkContext])
private val mockMetricsSystem: MetricsSystem =
Mockito.mock(classOf[MetricsSystem])
+ private def numDroppedEvents(bus: LiveListenerBus): Long = {
+
bus.metrics.metricRegistry.counter(s"queue.$SHARED_QUEUE.numDroppedEvents").getCount
+ }
+
+ private def queueSize(bus: LiveListenerBus): Int = {
+
bus.metrics.metricRegistry.getGauges().get(s"queue.$SHARED_QUEUE.size").getValue()
+ .asInstanceOf[Int]
+ }
+
+ private def eventProcessingTimeCount(bus: LiveListenerBus): Long = {
+
bus.metrics.metricRegistry.timer(s"queue.$SHARED_QUEUE.listenerProcessingTime").getCount()
+ }
+
test("don't call sc.stop in listener") {
sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
val listener = new SparkContextStoppingListener(sc)
- val bus = new LiveListenerBus(sc.conf)
- bus.addListener(listener)
- // Starting listener bus should flush all buffered events
- bus.start(sc, sc.env.metricsSystem)
- bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
- bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ sc.listenerBus.addToSharedQueue(listener)
+ sc.listenerBus.post(SparkListenerJobEnd(0, jobCompletionTime,
JobSucceeded))
+ sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ sc.stop()
- bus.stop()
assert(listener.sparkExSeen)
}
test("basic creation and shutdown of LiveListenerBus") {
val conf = new SparkConf()
val counter = new BasicJobCounter
val bus = new LiveListenerBus(conf)
- bus.addListener(counter)
+ bus.addToSharedQueue(counter)
// Metrics are initially empty.
assert(bus.metrics.numEventsPosted.getCount === 0)
- assert(bus.metrics.numDroppedEvents.getCount === 0)
- assert(bus.metrics.queueSize.getValue === 0)
- assert(bus.metrics.eventProcessingTime.getCount === 0)
+ assert(numDroppedEvents(bus) === 0)
+ assert(queueSize(bus) === 0)
+ assert(eventProcessingTimeCount(bus) === 0)
// Post five events:
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0,
jobCompletionTime, JobSucceeded)) }
// Five messages should be marked as received and queued, but no
messages should be posted to
// listeners yet because the the listener bus hasn't been started.
assert(bus.metrics.numEventsPosted.getCount === 5)
- assert(bus.metrics.queueSize.getValue === 5)
+ assert(queueSize(bus) === 5)
assert(counter.count === 0)
// Starting listener bus should flush all buffered events
bus.start(mockSparkContext, mockMetricsSystem)
Mockito.verify(mockMetricsSystem).registerSource(bus.metrics)
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(counter.count === 5)
- assert(bus.metrics.queueSize.getValue === 0)
- assert(bus.metrics.eventProcessingTime.getCount === 5)
+ assert(queueSize(bus) === 0)
+ assert(eventProcessingTimeCount(bus) === 5)
// After listener bus has stopped, posting events should not increment
counter
bus.stop()
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0,
jobCompletionTime, JobSucceeded)) }
assert(counter.count === 5)
- assert(bus.metrics.numEventsPosted.getCount === 5)
-
- // Make sure per-listener-class timers were created:
--- End diff --
oh seems we don't need it anymore.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]