[1/2] spark git commit: [SPARK-6132] ContextCleaner race condition across SparkContexts
Repository: spark Updated Branches: refs/heads/branch-1.1 c5836816f - 39761f515 [SPARK-6132] ContextCleaner race condition across SparkContexts The problem is that `ContextCleaner` may clean variables that belong to a different `SparkContext`. This can happen if the `SparkContext` to which the cleaner belongs stops, and a new one is started immediately afterwards in the same JVM. In this case, if the cleaner is in the middle of cleaning a broadcast, for instance, it will do so through `SparkEnv.get.blockManager`, which could be one that belongs to a different `SparkContext`. JoshRosen and I suspect that this is the cause of many flaky tests, most notably the `JavaAPISuite`. We were able to reproduce the failure locally (though it is not deterministic and very hard to reproduce). Author: Andrew Or and...@databricks.com Closes #4869 from andrewor14/cleaner-masquerade and squashes the following commits: 29168c0 [Andrew Or] Synchronize ContextCleaner stop Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e445ce61 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e445ce61 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e445ce61 Branch: refs/heads/branch-1.1 Commit: e445ce61d02778beaa30a2d1867e5b06fe09fb5d Parents: c583681 Author: Andrew Or and...@databricks.com Authored: Tue Mar 3 13:44:05 2015 -0800 Committer: Sean Owen so...@cloudera.com Committed: Sun Mar 22 13:08:14 2015 + -- .../scala/org/apache/spark/ContextCleaner.scala | 35 ++-- 1 file changed, 24 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e445ce61/core/src/main/scala/org/apache/spark/ContextCleaner.scala -- diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index ede1e23..201e5ec 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -104,9 +104,19 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { cleaningThread.start() } - /** Stop the cleaner. */ + /** + * Stop the cleaning thread and wait until the thread has finished running its current task. + */ def stop() { stopped = true +// Interrupt the cleaning thread, but wait until the current task has finished before +// doing so. This guards against the race condition where a cleaning thread may +// potentially clean similarly named variables created by a different SparkContext, +// resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132). +synchronized { + cleaningThread.interrupt() +} +cleaningThread.join() } /** Register a RDD for cleanup when it is garbage collected. */ @@ -135,16 +145,19 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { try { val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT)) .map(_.asInstanceOf[CleanupTaskWeakReference]) -reference.map(_.task).foreach { task = - logDebug(Got cleaning task + task) - referenceBuffer -= reference.get - task match { -case CleanRDD(rddId) = - doCleanupRDD(rddId, blocking = blockOnCleanupTasks) -case CleanShuffle(shuffleId) = - doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks) -case CleanBroadcast(broadcastId) = - doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) +// Synchronize here to avoid being interrupted on stop() +synchronized { + reference.map(_.task).foreach { task = +logDebug(Got cleaning task + task) +referenceBuffer -= reference.get +task match { + case CleanRDD(rddId) = +doCleanupRDD(rddId, blocking = blockOnCleanupTasks) + case CleanShuffle(shuffleId) = +doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks) + case CleanBroadcast(broadcastId) = +doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) +} } } } catch { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/2] spark git commit: [SPARK-6132] ContextCleaner race condition across SparkContexts
Repository: spark Updated Branches: refs/heads/branch-1.3 9846790f4 - 338bea7b3 [SPARK-6132] ContextCleaner race condition across SparkContexts The problem is that `ContextCleaner` may clean variables that belong to a different `SparkContext`. This can happen if the `SparkContext` to which the cleaner belongs stops, and a new one is started immediately afterwards in the same JVM. In this case, if the cleaner is in the middle of cleaning a broadcast, for instance, it will do so through `SparkEnv.get.blockManager`, which could be one that belongs to a different `SparkContext`. JoshRosen and I suspect that this is the cause of many flaky tests, most notably the `JavaAPISuite`. We were able to reproduce the failure locally (though it is not deterministic and very hard to reproduce). Author: Andrew Or and...@databricks.com Closes #4869 from andrewor14/cleaner-masquerade and squashes the following commits: 29168c0 [Andrew Or] Synchronize ContextCleaner stop Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3cdc8a35 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3cdc8a35 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3cdc8a35 Branch: refs/heads/branch-1.3 Commit: 3cdc8a35a7b9bbdf418988d0fe4524d413dce23c Parents: 9846790 Author: Andrew Or and...@databricks.com Authored: Tue Mar 3 13:44:05 2015 -0800 Committer: Sean Owen so...@cloudera.com Committed: Fri Mar 13 18:20:50 2015 + -- .../scala/org/apache/spark/ContextCleaner.scala | 35 ++-- 1 file changed, 24 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3cdc8a35/core/src/main/scala/org/apache/spark/ContextCleaner.scala -- diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index ede1e23..201e5ec 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -104,9 +104,19 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { cleaningThread.start() } - /** Stop the cleaner. */ + /** + * Stop the cleaning thread and wait until the thread has finished running its current task. + */ def stop() { stopped = true +// Interrupt the cleaning thread, but wait until the current task has finished before +// doing so. This guards against the race condition where a cleaning thread may +// potentially clean similarly named variables created by a different SparkContext, +// resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132). +synchronized { + cleaningThread.interrupt() +} +cleaningThread.join() } /** Register a RDD for cleanup when it is garbage collected. */ @@ -135,16 +145,19 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { try { val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT)) .map(_.asInstanceOf[CleanupTaskWeakReference]) -reference.map(_.task).foreach { task = - logDebug(Got cleaning task + task) - referenceBuffer -= reference.get - task match { -case CleanRDD(rddId) = - doCleanupRDD(rddId, blocking = blockOnCleanupTasks) -case CleanShuffle(shuffleId) = - doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks) -case CleanBroadcast(broadcastId) = - doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) +// Synchronize here to avoid being interrupted on stop() +synchronized { + reference.map(_.task).foreach { task = +logDebug(Got cleaning task + task) +referenceBuffer -= reference.get +task match { + case CleanRDD(rddId) = +doCleanupRDD(rddId, blocking = blockOnCleanupTasks) + case CleanShuffle(shuffleId) = +doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks) + case CleanBroadcast(broadcastId) = +doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) +} } } } catch { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org