[1/2] spark git commit: [SPARK-6132] ContextCleaner race condition across SparkContexts

2015-03-22 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 c5836816f - 39761f515


[SPARK-6132] ContextCleaner race condition across SparkContexts

The problem is that `ContextCleaner` may clean variables that belong to a 
different `SparkContext`. This can happen if the `SparkContext` to which the 
cleaner belongs stops, and a new one is started immediately afterwards in the 
same JVM. In this case, if the cleaner is in the middle of cleaning a 
broadcast, for instance, it will do so through `SparkEnv.get.blockManager`, 
which could be one that belongs to a different `SparkContext`.

JoshRosen and I suspect that this is the cause of many flaky tests, most 
notably the `JavaAPISuite`. We were able to reproduce the failure locally 
(though it is not deterministic and very hard to reproduce).

Author: Andrew Or and...@databricks.com

Closes #4869 from andrewor14/cleaner-masquerade and squashes the following 
commits:

29168c0 [Andrew Or] Synchronize ContextCleaner stop


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e445ce61
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e445ce61
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e445ce61

Branch: refs/heads/branch-1.1
Commit: e445ce61d02778beaa30a2d1867e5b06fe09fb5d
Parents: c583681
Author: Andrew Or and...@databricks.com
Authored: Tue Mar 3 13:44:05 2015 -0800
Committer: Sean Owen so...@cloudera.com
Committed: Sun Mar 22 13:08:14 2015 +

--
 .../scala/org/apache/spark/ContextCleaner.scala | 35 ++--
 1 file changed, 24 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e445ce61/core/src/main/scala/org/apache/spark/ContextCleaner.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala 
b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index ede1e23..201e5ec 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -104,9 +104,19 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
 cleaningThread.start()
   }
 
-  /** Stop the cleaner. */
+  /**
+   * Stop the cleaning thread and wait until the thread has finished running 
its current task.
+   */
   def stop() {
 stopped = true
+// Interrupt the cleaning thread, but wait until the current task has 
finished before
+// doing so. This guards against the race condition where a cleaning 
thread may
+// potentially clean similarly named variables created by a different 
SparkContext,
+// resulting in otherwise inexplicable block-not-found exceptions 
(SPARK-6132).
+synchronized {
+  cleaningThread.interrupt()
+}
+cleaningThread.join()
   }
 
   /** Register a RDD for cleanup when it is garbage collected. */
@@ -135,16 +145,19 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
   try {
 val reference = 
Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
   .map(_.asInstanceOf[CleanupTaskWeakReference])
-reference.map(_.task).foreach { task =
-  logDebug(Got cleaning task  + task)
-  referenceBuffer -= reference.get
-  task match {
-case CleanRDD(rddId) =
-  doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
-case CleanShuffle(shuffleId) =
-  doCleanupShuffle(shuffleId, blocking = 
blockOnShuffleCleanupTasks)
-case CleanBroadcast(broadcastId) =
-  doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
+// Synchronize here to avoid being interrupted on stop()
+synchronized {
+  reference.map(_.task).foreach { task =
+logDebug(Got cleaning task  + task)
+referenceBuffer -= reference.get
+task match {
+  case CleanRDD(rddId) =
+doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
+  case CleanShuffle(shuffleId) =
+doCleanupShuffle(shuffleId, blocking = 
blockOnShuffleCleanupTasks)
+  case CleanBroadcast(broadcastId) =
+doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
+}
   }
 }
   } catch {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[1/2] spark git commit: [SPARK-6132] ContextCleaner race condition across SparkContexts

2015-03-13 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 9846790f4 - 338bea7b3


[SPARK-6132] ContextCleaner race condition across SparkContexts

The problem is that `ContextCleaner` may clean variables that belong to a 
different `SparkContext`. This can happen if the `SparkContext` to which the 
cleaner belongs stops, and a new one is started immediately afterwards in the 
same JVM. In this case, if the cleaner is in the middle of cleaning a 
broadcast, for instance, it will do so through `SparkEnv.get.blockManager`, 
which could be one that belongs to a different `SparkContext`.

JoshRosen and I suspect that this is the cause of many flaky tests, most 
notably the `JavaAPISuite`. We were able to reproduce the failure locally 
(though it is not deterministic and very hard to reproduce).

Author: Andrew Or and...@databricks.com

Closes #4869 from andrewor14/cleaner-masquerade and squashes the following 
commits:

29168c0 [Andrew Or] Synchronize ContextCleaner stop


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3cdc8a35
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3cdc8a35
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3cdc8a35

Branch: refs/heads/branch-1.3
Commit: 3cdc8a35a7b9bbdf418988d0fe4524d413dce23c
Parents: 9846790
Author: Andrew Or and...@databricks.com
Authored: Tue Mar 3 13:44:05 2015 -0800
Committer: Sean Owen so...@cloudera.com
Committed: Fri Mar 13 18:20:50 2015 +

--
 .../scala/org/apache/spark/ContextCleaner.scala | 35 ++--
 1 file changed, 24 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3cdc8a35/core/src/main/scala/org/apache/spark/ContextCleaner.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala 
b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index ede1e23..201e5ec 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -104,9 +104,19 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
 cleaningThread.start()
   }
 
-  /** Stop the cleaner. */
+  /**
+   * Stop the cleaning thread and wait until the thread has finished running 
its current task.
+   */
   def stop() {
 stopped = true
+// Interrupt the cleaning thread, but wait until the current task has 
finished before
+// doing so. This guards against the race condition where a cleaning 
thread may
+// potentially clean similarly named variables created by a different 
SparkContext,
+// resulting in otherwise inexplicable block-not-found exceptions 
(SPARK-6132).
+synchronized {
+  cleaningThread.interrupt()
+}
+cleaningThread.join()
   }
 
   /** Register a RDD for cleanup when it is garbage collected. */
@@ -135,16 +145,19 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
   try {
 val reference = 
Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
   .map(_.asInstanceOf[CleanupTaskWeakReference])
-reference.map(_.task).foreach { task =
-  logDebug(Got cleaning task  + task)
-  referenceBuffer -= reference.get
-  task match {
-case CleanRDD(rddId) =
-  doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
-case CleanShuffle(shuffleId) =
-  doCleanupShuffle(shuffleId, blocking = 
blockOnShuffleCleanupTasks)
-case CleanBroadcast(broadcastId) =
-  doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
+// Synchronize here to avoid being interrupted on stop()
+synchronized {
+  reference.map(_.task).foreach { task =
+logDebug(Got cleaning task  + task)
+referenceBuffer -= reference.get
+task match {
+  case CleanRDD(rddId) =
+doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
+  case CleanShuffle(shuffleId) =
+doCleanupShuffle(shuffleId, blocking = 
blockOnShuffleCleanupTasks)
+  case CleanBroadcast(broadcastId) =
+doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
+}
   }
 }
   } catch {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org