This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 9f8c7a2 [SPARK-28709][DSTREAMS] Fix StreamingContext leak through Streaming 9f8c7a2 is described below commit 9f8c7a280476d37fb430da0adbde5d61e8a40714 Author: Nikita Gorbachevsky <niki...@playtika.com> AuthorDate: Mon Aug 26 09:30:36 2019 -0500 [SPARK-28709][DSTREAMS] Fix StreamingContext leak through Streaming ## What changes were proposed in this pull request? In my application spark streaming is restarted programmatically by stopping StreamingContext without stopping of SparkContext and creating/starting a new one. I use it for automatic detection of Kafka topic/partition changes and automatic failover in case of non fatal exceptions. However i notice that after multiple restarts driver fails with OOM. During investigation of heap dump i figured out that StreamingContext object isn't cleared by GC after stopping. <img width="1901" alt="Screen Shot 2019-08-14 at 12 23 33" src="https://user-images.githubusercontent.com/13151161/63010149-83f4c200-be8e-11e9-9f48-12b6e97839f4.png"> There are several places which holds reference to it : 1. StreamingTab registers StreamingJobProgressListener which holds reference to Streaming Context directly to LiveListenerBus shared queue via ssc.sc.addSparkListener(listener) method invocation. However this listener isn't unregistered at stop method. 2. json handlers (/streaming/json and /streaming/batch/json) aren't unregistered in SparkUI, while they hold reference to StreamingJobProgressListener. Basically the same issue affects all the pages, i assume that renderJsonHandler should be added to pageToHandlers cache on attachPage method invocation in order to unregistered it as well on detachPage. 3. SparkUi holds reference to StreamingJobProgressListener in the corresponding local variable which isn't cleared after stopping of StreamingContext. ## How was this patch tested? Added tests to existing test suites. After i applied these changes via reflection in my app OOM on driver side gone. Closes #25439 from choojoyq/SPARK-28709-fix-streaming-context-leak-on-stop. Authored-by: Nikita Gorbachevsky <niki...@playtika.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> --- .../main/scala/org/apache/spark/ui/SparkUI.scala | 3 +++ .../src/main/scala/org/apache/spark/ui/WebUI.scala | 1 + .../apache/spark/streaming/StreamingContext.scala | 29 ++++++++++++++++++---- .../apache/spark/streaming/ui/StreamingTab.scala | 28 ++++++--------------- .../apache/spark/streaming/InputStreamsSuite.scala | 4 --- .../spark/streaming/StreamingContextSuite.scala | 24 ++++++++++++++++++ .../apache/spark/streaming/UISeleniumSuite.scala | 4 +++ 7 files changed, 63 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 1175bc2..6fb8e45 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -138,6 +138,9 @@ private[spark] class SparkUI private ( streamingJobProgressListener = Option(sparkListener) } + def clearStreamingJobProgressListener(): Unit = { + streamingJobProgressListener = None + } } private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String) diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 54ae258..1fe822a 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -93,6 +93,7 @@ private[spark] abstract class WebUI( attachHandler(renderJsonHandler) val handlers = pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]()) handlers += renderHandler + handlers += renderJsonHandler } /** Attaches a handler to this UI. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 15ebef2..48913ea 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -38,7 +38,6 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.input.FixedLengthBinaryInputFormat import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.UI._ import org.apache.spark.rdd.{RDD, RDDOperationScope} import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.SerializationDebugger @@ -189,10 +188,9 @@ class StreamingContext private[streaming] ( private[streaming] val progressListener = new StreamingJobProgressListener(this) private[streaming] val uiTab: Option[StreamingTab] = - if (conf.get(UI_ENABLED)) { - Some(new StreamingTab(this)) - } else { - None + sparkContext.ui match { + case Some(ui) => Some(new StreamingTab(this, ui)) + case None => None } /* Initializing a streamingSource to register metrics */ @@ -511,6 +509,10 @@ class StreamingContext private[streaming] ( scheduler.listenerBus.addListener(streamingListener) } + def removeStreamingListener(streamingListener: StreamingListener): Unit = { + scheduler.listenerBus.removeListener(streamingListener) + } + private def validate() { assert(graph != null, "Graph is null") graph.validate() @@ -575,6 +577,8 @@ class StreamingContext private[streaming] ( try { validate() + registerProgressListener() + // Start the streaming scheduler in a new thread, so that thread local properties // like call sites and job groups can be reset without affecting those of the // current thread. @@ -690,6 +694,9 @@ class StreamingContext private[streaming] ( Utils.tryLogNonFatalError { uiTab.foreach(_.detach()) } + Utils.tryLogNonFatalError { + unregisterProgressListener() + } StreamingContext.setActiveContext(null) Utils.tryLogNonFatalError { waiter.notifyStop() @@ -716,6 +723,18 @@ class StreamingContext private[streaming] ( // Do not stop SparkContext, let its own shutdown hook stop it stop(stopSparkContext = false, stopGracefully = stopGracefully) } + + private def registerProgressListener(): Unit = { + addStreamingListener(progressListener) + sc.addSparkListener(progressListener) + sc.ui.foreach(_.setStreamingJobProgressListener(progressListener)) + } + + private def unregisterProgressListener(): Unit = { + removeStreamingListener(progressListener) + sc.removeSparkListener(progressListener) + sc.ui.foreach(_.clearStreamingJobProgressListener()) + } } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala index 25e7125..13357db 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala @@ -17,7 +17,6 @@ package org.apache.spark.streaming.ui -import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.streaming.StreamingContext import org.apache.spark.ui.{SparkUI, SparkUITab} @@ -26,37 +25,24 @@ import org.apache.spark.ui.{SparkUI, SparkUITab} * Spark Web UI tab that shows statistics of a streaming job. * This assumes the given SparkContext has enabled its SparkUI. */ -private[spark] class StreamingTab(val ssc: StreamingContext) - extends SparkUITab(StreamingTab.getSparkUI(ssc), "streaming") with Logging { - - import StreamingTab._ +private[spark] class StreamingTab(val ssc: StreamingContext, sparkUI: SparkUI) + extends SparkUITab(sparkUI, "streaming") with Logging { private val STATIC_RESOURCE_DIR = "org/apache/spark/streaming/ui/static" - val parent = getSparkUI(ssc) + val parent = sparkUI val listener = ssc.progressListener - ssc.addStreamingListener(listener) - ssc.sc.addSparkListener(listener) - parent.setStreamingJobProgressListener(listener) attachPage(new StreamingPage(this)) attachPage(new BatchPage(this)) def attach() { - getSparkUI(ssc).attachTab(this) - getSparkUI(ssc).addStaticHandler(STATIC_RESOURCE_DIR, "/static/streaming") + parent.attachTab(this) + parent.addStaticHandler(STATIC_RESOURCE_DIR, "/static/streaming") } def detach() { - getSparkUI(ssc).detachTab(this) - getSparkUI(ssc).detachHandler("/static/streaming") - } -} - -private object StreamingTab { - def getSparkUI(ssc: StreamingContext): SparkUI = { - ssc.sc.ui.getOrElse { - throw new SparkException("Parent SparkUI to attach this tab to not found!") - } + parent.detachTab(this) + parent.detachHandler("/static/streaming") } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 035ed4a..0792770 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -52,8 +52,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Set up the streaming context and input streams withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => - ssc.addStreamingListener(ssc.progressListener) - val input = Seq(1, 2, 3, 4, 5) // Use "batchCount" to make sure we check the result after all batches finish val batchCounter = new BatchCounter(ssc) @@ -106,8 +104,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { testServer.start() withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => - ssc.addStreamingListener(ssc.progressListener) - val batchCounter = new BatchCounter(ssc) val networkStream = ssc.socketTextStream( "localhost", testServer.port, StorageLevel.MEMORY_AND_DISK) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 5cda6f9..56c7cbf 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -34,6 +34,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark._ import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.metrics.MetricsSystem import org.apache.spark.metrics.source.Source import org.apache.spark.storage.StorageLevel @@ -392,6 +393,29 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL assert(!sourcesAfterStop.contains(streamingSourceAfterStop)) } + test("SPARK-28709 registering and de-registering of progressListener") { + val conf = new SparkConf().setMaster(master).setAppName(appName) + conf.set(UI_ENABLED, true) + + ssc = new StreamingContext(conf, batchDuration) + + assert(ssc.sc.ui.isDefined, "Spark UI is not started!") + val sparkUI = ssc.sc.ui.get + + addInputStream(ssc).register() + ssc.start() + + assert(ssc.scheduler.listenerBus.listeners.contains(ssc.progressListener)) + assert(ssc.sc.listenerBus.listeners.contains(ssc.progressListener)) + assert(sparkUI.getStreamingJobProgressListener.get == ssc.progressListener) + + ssc.stop() + + assert(!ssc.scheduler.listenerBus.listeners.contains(ssc.progressListener)) + assert(!ssc.sc.listenerBus.listeners.contains(ssc.progressListener)) + assert(sparkUI.getStreamingJobProgressListener.isEmpty) + } + test("awaitTermination") { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala index 483a751..1d34221 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala @@ -97,6 +97,8 @@ class UISeleniumSuite val sparkUI = ssc.sparkContext.ui.get + sparkUI.getHandlers.count(_.getContextPath.contains("/streaming")) should be (5) + eventually(timeout(10.seconds), interval(50.milliseconds)) { go to (sparkUI.webUrl.stripSuffix("/")) find(cssSelector( """ul li a[href*="streaming"]""")) should not be (None) @@ -196,6 +198,8 @@ class UISeleniumSuite ssc.stop(false) + sparkUI.getHandlers.count(_.getContextPath.contains("/streaming")) should be (0) + eventually(timeout(10.seconds), interval(50.milliseconds)) { go to (sparkUI.webUrl.stripSuffix("/")) find(cssSelector( """ul li a[href*="streaming"]""")) should be(None) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org