git commit: SPARK-1587 Fix thread leak

2014-04-24 Thread adav
Repository: spark
Updated Branches:
  refs/heads/master bb68f4774 - dd681f502


SPARK-1587 Fix thread leak

mvn test fails (intermittently) due to thread leak - since scalatest runs all 
tests in same vm.

Author: Mridul Muralidharan mridul...@apache.org

Closes #504 from mridulm/resource_leak_fixes and squashes the following commits:

a5d10d0 [Mridul Muralidharan] Prevent thread leaks while running tests : 
cleanup all threads when SparkContext.stop is invoked. Causes tests to fail
7b5e19c [Mridul Muralidharan] Prevent NPE while running tests


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dd681f50
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dd681f50
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dd681f50

Branch: refs/heads/master
Commit: dd681f502eafe39cfb8a5a62ea2d28016ac6013d
Parents: bb68f47
Author: Mridul Muralidharan mridul...@apache.org
Authored: Wed Apr 23 23:20:55 2014 -0700
Committer: Aaron Davidson aa...@databricks.com
Committed: Wed Apr 23 23:20:55 2014 -0700

--
 .../apache/spark/metrics/MetricsSystem.scala| 22 ---
 .../spark/scheduler/TaskSchedulerImpl.scala |  1 +
 .../org/apache/spark/storage/BlockManager.scala |  2 ++
 .../apache/spark/storage/DiskBlockManager.scala | 28 
 .../spark/storage/ShuffleBlockManager.scala |  4 +++
 .../scala/org/apache/spark/ui/JettyUtils.scala  |  1 +
 .../spark/storage/DiskBlockManagerSuite.scala   |  5 
 7 files changed, 42 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/dd681f50/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
--
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala 
b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index c5bda20..651511d 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -129,17 +129,19 @@ private[spark] class MetricsSystem private (val instance: 
String,
 
 sinkConfigs.foreach { kv =
   val classPath = kv._2.getProperty(class)
-  try {
-val sink = Class.forName(classPath)
-  .getConstructor(classOf[Properties], classOf[MetricRegistry], 
classOf[SecurityManager])
-  .newInstance(kv._2, registry, securityMgr)
-if (kv._1 == servlet) {
-   metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
-} else {
-  sinks += sink.asInstanceOf[Sink]
+  if (null != classPath) {
+try {
+  val sink = Class.forName(classPath)
+.getConstructor(classOf[Properties], classOf[MetricRegistry], 
classOf[SecurityManager])
+.newInstance(kv._2, registry, securityMgr)
+  if (kv._1 == servlet) {
+metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
+  } else {
+sinks += sink.asInstanceOf[Sink]
+  }
+} catch {
+  case e: Exception = logError(Sink class  + classPath +  cannot 
be instantialized, e)
 }
-  } catch {
-case e: Exception = logError(Sink class  + classPath +  cannot be 
instantialized, e)
   }
 }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/dd681f50/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index be19d9b..5a68f38 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -356,6 +356,7 @@ private[spark] class TaskSchedulerImpl(
 if (taskResultGetter != null) {
   taskResultGetter.stop()
 }
+starvationTimer.cancel()
 
 // sleeping for an arbitrary 1 seconds to ensure that messages are sent 
out.
 Thread.sleep(1000L)

http://git-wip-us.apache.org/repos/asf/spark/blob/dd681f50/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
--
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index f15fa4d..ccd5c53 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1021,6 +1021,8 @@ private[spark] class BlockManager(
   heartBeatTask.cancel()
 }
 connectionManager.stop()
+shuffleBlockManager.stop()
+diskBlockManager.stop()
 

git commit: SPARK-1587 Fix thread leak

2014-04-24 Thread adav
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 e8907718a - 8684a15e5


SPARK-1587 Fix thread leak

mvn test fails (intermittently) due to thread leak - since scalatest runs all 
tests in same vm.

Author: Mridul Muralidharan mridul...@apache.org

Closes #504 from mridulm/resource_leak_fixes and squashes the following commits:

a5d10d0 [Mridul Muralidharan] Prevent thread leaks while running tests : 
cleanup all threads when SparkContext.stop is invoked. Causes tests to fail
7b5e19c [Mridul Muralidharan] Prevent NPE while running tests

(cherry picked from commit dd681f502eafe39cfb8a5a62ea2d28016ac6013d)
Signed-off-by: Aaron Davidson aa...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8684a15e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8684a15e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8684a15e

Branch: refs/heads/branch-1.0
Commit: 8684a15e50978480831d5f5e52684a61fe4ee7a6
Parents: e890771
Author: Mridul Muralidharan mridul...@apache.org
Authored: Wed Apr 23 23:20:55 2014 -0700
Committer: Aaron Davidson aa...@databricks.com
Committed: Wed Apr 23 23:21:16 2014 -0700

--
 .../apache/spark/metrics/MetricsSystem.scala| 22 ---
 .../spark/scheduler/TaskSchedulerImpl.scala |  1 +
 .../org/apache/spark/storage/BlockManager.scala |  2 ++
 .../apache/spark/storage/DiskBlockManager.scala | 28 
 .../spark/storage/ShuffleBlockManager.scala |  4 +++
 .../scala/org/apache/spark/ui/JettyUtils.scala  |  1 +
 .../spark/storage/DiskBlockManagerSuite.scala   |  5 
 7 files changed, 42 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8684a15e/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
--
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala 
b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index c5bda20..651511d 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -129,17 +129,19 @@ private[spark] class MetricsSystem private (val instance: 
String,
 
 sinkConfigs.foreach { kv =
   val classPath = kv._2.getProperty(class)
-  try {
-val sink = Class.forName(classPath)
-  .getConstructor(classOf[Properties], classOf[MetricRegistry], 
classOf[SecurityManager])
-  .newInstance(kv._2, registry, securityMgr)
-if (kv._1 == servlet) {
-   metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
-} else {
-  sinks += sink.asInstanceOf[Sink]
+  if (null != classPath) {
+try {
+  val sink = Class.forName(classPath)
+.getConstructor(classOf[Properties], classOf[MetricRegistry], 
classOf[SecurityManager])
+.newInstance(kv._2, registry, securityMgr)
+  if (kv._1 == servlet) {
+metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
+  } else {
+sinks += sink.asInstanceOf[Sink]
+  }
+} catch {
+  case e: Exception = logError(Sink class  + classPath +  cannot 
be instantialized, e)
 }
-  } catch {
-case e: Exception = logError(Sink class  + classPath +  cannot be 
instantialized, e)
   }
 }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8684a15e/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index be19d9b..5a68f38 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -356,6 +356,7 @@ private[spark] class TaskSchedulerImpl(
 if (taskResultGetter != null) {
   taskResultGetter.stop()
 }
+starvationTimer.cancel()
 
 // sleeping for an arbitrary 1 seconds to ensure that messages are sent 
out.
 Thread.sleep(1000L)

http://git-wip-us.apache.org/repos/asf/spark/blob/8684a15e/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
--
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index f15fa4d..ccd5c53 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1021,6 +1021,8 @@ private[spark] class BlockManager(