git commit: SPARK-1587 Fix thread leak
Repository: spark Updated Branches: refs/heads/master bb68f4774 - dd681f502 SPARK-1587 Fix thread leak mvn test fails (intermittently) due to thread leak - since scalatest runs all tests in same vm. Author: Mridul Muralidharan mridul...@apache.org Closes #504 from mridulm/resource_leak_fixes and squashes the following commits: a5d10d0 [Mridul Muralidharan] Prevent thread leaks while running tests : cleanup all threads when SparkContext.stop is invoked. Causes tests to fail 7b5e19c [Mridul Muralidharan] Prevent NPE while running tests Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dd681f50 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dd681f50 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dd681f50 Branch: refs/heads/master Commit: dd681f502eafe39cfb8a5a62ea2d28016ac6013d Parents: bb68f47 Author: Mridul Muralidharan mridul...@apache.org Authored: Wed Apr 23 23:20:55 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Wed Apr 23 23:20:55 2014 -0700 -- .../apache/spark/metrics/MetricsSystem.scala| 22 --- .../spark/scheduler/TaskSchedulerImpl.scala | 1 + .../org/apache/spark/storage/BlockManager.scala | 2 ++ .../apache/spark/storage/DiskBlockManager.scala | 28 .../spark/storage/ShuffleBlockManager.scala | 4 +++ .../scala/org/apache/spark/ui/JettyUtils.scala | 1 + .../spark/storage/DiskBlockManagerSuite.scala | 5 7 files changed, 42 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dd681f50/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala -- diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index c5bda20..651511d 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -129,17 +129,19 @@ private[spark] class MetricsSystem private (val instance: String, sinkConfigs.foreach { kv = val classPath = kv._2.getProperty(class) - try { -val sink = Class.forName(classPath) - .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) - .newInstance(kv._2, registry, securityMgr) -if (kv._1 == servlet) { - metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) -} else { - sinks += sink.asInstanceOf[Sink] + if (null != classPath) { +try { + val sink = Class.forName(classPath) +.getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) +.newInstance(kv._2, registry, securityMgr) + if (kv._1 == servlet) { +metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) + } else { +sinks += sink.asInstanceOf[Sink] + } +} catch { + case e: Exception = logError(Sink class + classPath + cannot be instantialized, e) } - } catch { -case e: Exception = logError(Sink class + classPath + cannot be instantialized, e) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/dd681f50/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index be19d9b..5a68f38 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -356,6 +356,7 @@ private[spark] class TaskSchedulerImpl( if (taskResultGetter != null) { taskResultGetter.stop() } +starvationTimer.cancel() // sleeping for an arbitrary 1 seconds to ensure that messages are sent out. Thread.sleep(1000L) http://git-wip-us.apache.org/repos/asf/spark/blob/dd681f50/core/src/main/scala/org/apache/spark/storage/BlockManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f15fa4d..ccd5c53 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1021,6 +1021,8 @@ private[spark] class BlockManager( heartBeatTask.cancel() } connectionManager.stop() +shuffleBlockManager.stop() +diskBlockManager.stop()
git commit: SPARK-1587 Fix thread leak
Repository: spark Updated Branches: refs/heads/branch-1.0 e8907718a - 8684a15e5 SPARK-1587 Fix thread leak mvn test fails (intermittently) due to thread leak - since scalatest runs all tests in same vm. Author: Mridul Muralidharan mridul...@apache.org Closes #504 from mridulm/resource_leak_fixes and squashes the following commits: a5d10d0 [Mridul Muralidharan] Prevent thread leaks while running tests : cleanup all threads when SparkContext.stop is invoked. Causes tests to fail 7b5e19c [Mridul Muralidharan] Prevent NPE while running tests (cherry picked from commit dd681f502eafe39cfb8a5a62ea2d28016ac6013d) Signed-off-by: Aaron Davidson aa...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8684a15e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8684a15e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8684a15e Branch: refs/heads/branch-1.0 Commit: 8684a15e50978480831d5f5e52684a61fe4ee7a6 Parents: e890771 Author: Mridul Muralidharan mridul...@apache.org Authored: Wed Apr 23 23:20:55 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Wed Apr 23 23:21:16 2014 -0700 -- .../apache/spark/metrics/MetricsSystem.scala| 22 --- .../spark/scheduler/TaskSchedulerImpl.scala | 1 + .../org/apache/spark/storage/BlockManager.scala | 2 ++ .../apache/spark/storage/DiskBlockManager.scala | 28 .../spark/storage/ShuffleBlockManager.scala | 4 +++ .../scala/org/apache/spark/ui/JettyUtils.scala | 1 + .../spark/storage/DiskBlockManagerSuite.scala | 5 7 files changed, 42 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8684a15e/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala -- diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index c5bda20..651511d 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -129,17 +129,19 @@ private[spark] class MetricsSystem private (val instance: String, sinkConfigs.foreach { kv = val classPath = kv._2.getProperty(class) - try { -val sink = Class.forName(classPath) - .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) - .newInstance(kv._2, registry, securityMgr) -if (kv._1 == servlet) { - metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) -} else { - sinks += sink.asInstanceOf[Sink] + if (null != classPath) { +try { + val sink = Class.forName(classPath) +.getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) +.newInstance(kv._2, registry, securityMgr) + if (kv._1 == servlet) { +metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) + } else { +sinks += sink.asInstanceOf[Sink] + } +} catch { + case e: Exception = logError(Sink class + classPath + cannot be instantialized, e) } - } catch { -case e: Exception = logError(Sink class + classPath + cannot be instantialized, e) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/8684a15e/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index be19d9b..5a68f38 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -356,6 +356,7 @@ private[spark] class TaskSchedulerImpl( if (taskResultGetter != null) { taskResultGetter.stop() } +starvationTimer.cancel() // sleeping for an arbitrary 1 seconds to ensure that messages are sent out. Thread.sleep(1000L) http://git-wip-us.apache.org/repos/asf/spark/blob/8684a15e/core/src/main/scala/org/apache/spark/storage/BlockManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f15fa4d..ccd5c53 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1021,6 +1021,8 @@ private[spark] class BlockManager(